Source code for neurokernel.pattern

#!/usr/bin/env python

"""
Represent connectivity pattern using pandas DataFrame.
"""

from collections import OrderedDict
import itertools
import re

import networkx as nx
import numpy as np
import pandas as pd

from plsel import Selector, SelectorMethods
from pm import BasePortMapper

[docs]class Interface(object): """ Container for set of interface comprising ports. This class contains information about a set of interfaces comprising path-like identifiers and the attributes associated with them. By default, each port must have at least the following attributes; other attributes may be added: - interface - indicates which interface a port is associated with. - io - indicates whether the port receives input ('in') or emits output ('out'). - type - indicates whether the port emits/receives spikes or graded potentials. All port identifiers in an interface must be unique. For two interfaces to be deemed compatible, they must contain the same port identifiers and their identifiers' 'io' attributes must be the inverse of each other (i.e., every 'in' port in one interface must be mirrored by an 'out' port in the other interface. Examples -------- >>> i = Interface('/foo[0:4],/bar[0:3]') >>> i['/foo[0:2]', 'interface', 'io', 'type'] = [0, 'in', 'spike'] >>> i['/foo[2:4]', 'interface', 'io', 'type'] = [1, 'out', 'spike'] Attributes ---------- data : pandas.DataFrame Port attribute data. index : pandas.MultiIndex Index of port identifiers. Parameters ---------- selector : str, unicode, or sequence Selector string (e.g., 'foo[0:2]') or sequence of token sequences (e.g., [['foo', (0, 2)]]) describing the port identifiers comprised by the interface. columns : list, default = ['interface', 'io', 'type'] Data column names. See Also -------- plsel.SelectorMethods """
[docs] def __init__(self, selector='', columns=['interface', 'io', 'type']): # All ports in an interface must contain at least the following # attributes: assert set(columns).issuperset(['interface', 'io', 'type']) self.sel = SelectorMethods() assert not(self.sel.is_ambiguous(selector)) self.num_levels = self.sel.max_levels(selector) names = [i for i in xrange(self.num_levels)] idx = self.sel.make_index(selector, names) self.__validate_index__(idx) self.data = pd.DataFrame(index=idx, columns=columns, dtype=object) # Dictionary containing mappers for different port types: self.pm = {}
def __validate_index__(self, idx): """ Raise an exception if the specified index will result in an invalid interface. """ if idx.duplicated().any(): raise ValueError('Duplicate interface index entries detected.') def __getitem__(self, key): if type(key) == tuple and len(key) > 1: return self.sel.select(self.data[list(key[1:])], key[0]) else: return self.sel.select(self.data, key) def __setitem__ambiguous__(self, key, value): if type(key) == tuple: selector = key[0] else: selector = key # Ensure that the specified selector can actually be used against the # Interface's internal DataFrame: try: idx = self.sel.get_index(self.data, selector, names=self.data.index.names) except ValueError: raise ValueError('cannot create index with ' 'selector %s and column names %s' \ % (selector, str(self.data.index.names))) # If the data specified is not a dict, convert it to a dict: if type(key) == tuple and len(key) > 1: if np.isscalar(value): data = {k:value for k in key[1:]} elif type(value) == dict: data = value elif np.iterable(value) and len(value) <= len(key[1:]): data={k:v for k, v in zip(key[1:], value)} else: raise ValueError('cannot assign specified value') else: if np.isscalar(value): data = {self.data.columns[0]: value} elif type(value) == dict: data = value elif np.iterable(value) and len(value) <= len(self.data.columns): data={k:v for k, v in zip(self.data.columns, value)} else: raise ValueError('cannot assign specified value') for k, v in data.iteritems(): self.data[k].ix[idx] = v def __setitem__(self, key, value): if type(key) == tuple: selector = key[0] else: selector = key # Fall back to slower method if the selector is ambiguous: if self.sel.is_ambiguous(selector): self.__setitem__ambiguous__(key, value) return else: selector = Selector(selector) # Don't waste time trying to do anything if the selector is empty: if not selector.nonempty: return # If the number of specified identifiers doesn't exceed the size of the # data array, enlargement by specifying identifiers that are not in # the index will not occur: assert len(selector) <= len(self.data) # If the data specified is not a dict, convert it to a dict: if type(key) == tuple and len(key) > 1: if np.isscalar(value): data = {k:value for k in key[1:]} elif type(value) == dict: data = value elif np.iterable(value) and len(value) <= len(key[1:]): data={k:v for k, v in zip(key[1:], value)} else: raise ValueError('cannot assign specified value') else: if np.isscalar(value): data = {self.data.columns[0]: value} elif type(value) == dict: data = value elif np.iterable(value) and len(value) <= len(self.data.columns): data={k:v for k, v in zip(self.data.columns, value)} else: raise ValueError('cannot assign specified value') if selector.max_levels == 1: s = [i for i in itertools.chain(*selector.expanded)] else: s = self.sel.pad_selector(selector.expanded, len(self.index.levshape)) for k, v in data.iteritems(): self.data[k].ix[s] = v @property def index(self): """ Interface index. """ return self.data.index @index.setter def index(self, i): self.data.index = i @property def interface_ids(self): """ Interface identifiers. """ return set(self.data['interface']) @property def io_inv(self): """ Returns new Interface instance with inverse input-output attributes. Returns ------- i : Interface Interface instance whose 'io' attributes are the inverse of those of the current instance. """ data_inv = self.data.copy() f = lambda x: 'out' if x == 'in' else \ ('in' if x == 'out' else x) data_inv['io'] = data_inv['io'].apply(f) return self.from_df(data_inv) @property def idx_levels(self): """ Number of levels in Interface index. """ if isinstance(self.data.index, pd.MultiIndex): return len(self.index.levels) else: return 1 def clear(self): """ Clear all ports in class instance. """ self.data.drop(self.data.index, inplace=True) def data_select(self, f, inplace=False): """ Restrict Interface data with a selection function. Returns an Interface instance containing only those rows whose data is passed by the specified selection function. Parameters ---------- f : function Selection function with a single dict argument whose keys are the Interface's data column names. inplace : bool, default=False If True, update and return the given Interface instance. Otherwise, return a new instance. Returns ------- i : Interface Interface instance containing data selected by `f`. """ assert callable(f) result = self.data[f(self.data)] if inplace: self.data = result return self else: return Interface.from_df(result) @classmethod def from_df(cls, df): """ Create an Interface from a properly formatted DataFrame. Examples -------- >>> import plsel, pattern >>> import pandas >>> idx = plsel.SelectorMethods.make_index('/foo[0:2]') >>> data = [[0, 'in', 'spike'], [1, 'out', 'gpot']] >>> columns = ['interface', 'io', 'type'] >>> df = pandas.DataFrame(data, index=idx, columns=columns) >>> i = pattern.Interface.from_df(df) Parameters ---------- df : pandas.DataFrame DataFrame with a MultiIndex and data columns 'interface', 'io', and 'type' (additional columns may also be present). Returns ------- i : Interface Generated Interface instance. Notes ----- The contents of the specified DataFrame instance are copied into the new Interface instance. """ assert set(df.columns).issuperset(['interface', 'io', 'type']) if isinstance(df.index, pd.MultiIndex): if len(df.index): i = cls(df.index.tolist(), df.columns) else: i = cls([()], df.columns) elif isinstance(df.index, pd.Index): if len(df.index): i = cls([(s,) for s in df.index.tolist()], df.columns) else: i = cls([()], df.columns) else: raise ValueError('invalid index type') i.data = df.copy() i.__validate_index__(i.index) return i @classmethod def from_csv(cls, file_name, **kwargs): """ Create an Interface from a properly formatted CSV file. Parameters ---------- file_name : str File name of CSV file containing interface data. kwargs : dict Options to pass to `DataFrame.from_csv()` Returns ------- i : Interface Generated Interface instance. """ df = pd.DataFrame.from_csv(file_name, **kwargs) return cls.from_df(df) @classmethod def from_dict(cls, d): """ Create an Interface from a dictionary of selectors and data values. Examples -------- >>> d = {'/foo[0]': [0, 'in', 'gpot'], '/foo[1]': [1, 'in', 'gpot']} >>> i = Interface.from_dict(d) Parameters ---------- d : dict Dictionary that maps selectors to the data that should be associated with the corresponding ports. If a scalar, the data is assigned to the first attribute; if an iterable, the data is assigned to the attributes in order. Returns ------- i : Interface Generated interface instance. """ i = cls(','.join(d.keys())) for k, v in d.iteritems(): i[k] = v i.data.sort_index(inplace=True) return i @classmethod def from_graph(cls, g): """ Create an Interface from a NetworkX graph. Examples -------- >>> import networkx as nx >>> g = nx.Graph() >>> g.add_node('/foo[0]', interface=0, io='in', type='gpot') >>> g.add_node('/foo[1]', interface=0, io='in', type='gpot') >>> i = Interface.from_graph(g) Parameters ---------- g : networkx.Graph Graph whose node IDs are path-like port identifiers. The node attributes are assigned to the ports. Returns ------- i : Interface Generated interface instance. """ assert isinstance(g, nx.Graph) return cls.from_dict(g.node) @classmethod def from_selectors(cls, sel, sel_in='', sel_out='', sel_spike='', sel_gpot='', *sel_int_list): """ Create an Interface instance from selectors. Parameters ---------- sel : str, unicode, or sequence Selector describing all ports comprised by interface. sel_in : str, unicode, or sequence Selector describing the interface's input ports. sel_out : str, unicode, or sequence Selector describing the interface's output ports. sel_spike : str, unicode, or sequence Selector describing the interface's spiking ports. sel_gpot : str, unicode, or sequence Selector describing the interface's graded potential ports. sel_int_list : list of str, unicode, or sequence Selectors consecutively describing the ports associated with interface 0, interface 1, etc. Returns ------- i : Interface Generated interface instance. """ i = cls(sel) i[sel_in, 'io'] = 'in' i[sel_out, 'io'] = 'out' i[sel_spike, 'type'] = 'spike' i[sel_gpot, 'type'] = 'gpot' for n, sel_int in enumerate(sel_int_list): i[sel_int, 'interface'] = n return i def gpot_ports(self, i=None, tuples=False): """ Restrict Interface ports to graded potential ports. Parameters ---------- i : int Interface identifier. If None, return all graded potential ports. tuples : bool If True, return a list of tuples; if False, return an Interface instance. Returns ------- interface : Interface or list of tuples Either an Interface instance containing all graded potential ports and their attributes in the specified interface, or a list of tuples corresponding to the expanded ports. """ if i is None: try: df = self.data[self.data['type'] == 'gpot'] except: df = None else: try: df = self.data[(self.data['type'] == 'gpot') & \ (self.data['interface'] == i)] except: df = None if tuples: if df is None: return [] else: return df.index.tolist() else: if df is None: return Interface() else: return self.from_df(df) def in_ports(self, i=None, tuples=False): """ Restrict Interface ports to input ports. Parameters ---------- i : int Interface identifier. If None, return all input ports. tuples : bool If True, return a list of tuples; if False, return an Interface instance. Returns ------- interface : Interface or list of tuples Either an Interface instance containing all input ports and their attributes in the specified interface, or a list of tuples corresponding to the expanded ports. """ if i is None: try: df = self.data[self.data['io'] == 'in'] except: df = None else: try: df = self.data[(self.data['io'] == 'in') & \ (self.data['interface'] == i)] except: df = None if tuples: if df is None: return [] else: return df.index.tolist() else: if df is None: return Interface() else: return self.from_df(df) def interface_ports(self, i=None, tuples=False): """ Restrict Interface ports to specific interface. Parameters ---------- i : int Interface identifier. If None, return all ports. tuples : bool If True, return a list of tuples; if False, return an Interface instance. Returns ------- interface : Interface Either an Interface instance containing all ports and their attributes in the specified interface, or a list of tuples corresponding to the expanded ports. """ if i is None: if tuples: return self.index.tolist() else: return self.copy() else: try: df = self.data[self.data['interface'] == i] except: df = None if tuples: if df is None: return [] else: return df.index.tolist() else: if df is None: return Interface() else: return self.from_df(df) def _merge_on_interfaces(self, a, i, b): """ Merge contents of this and another Interface instance. Notes ----- If the number of levels in one Interface instance's DataFrame index is greater than that of the other, the number of levels in the index of the merged DataFrames instances is set to the former and the index with the smaller number is padded with blank entries to enable Panda's merge mechanism to function properly. """ assert isinstance(i, Interface) df_left = self.data[self.data['interface'] == a] df_right = i.data[i.data['interface'] == b] n_left_names = len(self.data.index.names) n_right_names = len(i.data.index.names) # Pandas' merge mechanism fails if the number of levels in each of the # merged MultiIndex indices differs and there is overlap of more than # one level; we therefore pad the index with the smaller number of # levels before attempting the merge: if n_left_names > n_right_names: for n in xrange(i.num_levels, i.num_levels+(n_left_names-n_right_names)): new_col = str(n) df_right[new_col] = '' df_right.set_index(new_col, append=True, inplace=True) elif n_left_names < n_right_names: for n in xrange(self.num_levels, self.num_levels+(n_right_names-n_left_names)): new_col = str(n) df_left[new_col] = '' df_left.set_index(new_col, append=True, inplace=True) return pd.merge(df_left, df_right, left_index=True, right_index=True) def get_common_ports(self, a, i, b, t=None): """ Get port identifiers common to this and another Interface instance. Parameters ---------- a : int Identifier of interface in the current instance. i : Interface Interface instance containing the other interface. b : int Identifier of interface in instance `i`. t : str or unicode If not None, restrict output to those identifiers with the specified port type. Returns ------- result : list of tuple Expanded port identifiers shared by the two specified Interface instances. Notes ----- The number of levels of the returned port identifiers is equal to the maximum number of levels of this Interface instance. The order of the returned port identifiers is not guaranteed. """ if t is None: x = self.data[self.data['interface'] == a] y = i.data[i.data['interface'] == b] else: x = self.data[(self.data['interface'] == a) & (self.data['type'] == t)] y = i.data[(i.data['interface'] == b) & (i.data['type'] == t)] if isinstance(x.index, pd.MultiIndex): x_list = [tuple(a for a in b if a != '') \ for b in x.index] else: x_list = [(a,) for a in x.index] if isinstance(y.index, pd.MultiIndex): y_list = [tuple(a for a in b if a != '') \ for b in y.index] else: y_list = [(a,) for a in y.index] return list(set(x_list).intersection(y_list)) def is_compatible(self, a, i, b, allow_subsets=False): """ Check whether two interfaces can be connected. Compares an interface in the current Interface instance with one in another instance to determine whether their ports can be connected. Parameters ---------- a : int Identifier of interface in the current instance. i : Interface Interface instance containing the other interface. b : int Identifier of interface in instance `i`. allow_subsets : bool If True, interfaces that contain a compatible subset of ports are deemed to be compatible; otherwise, all ports in the two interfaces must be compatible. Returns ------- result : bool True if both interfaces comprise the same identifiers, the set 'type' attributes for each matching pair of identifiers in the two interfaces match, and each identifier with an 'io' attribute set to 'out' in one interface has its 'io' attribute set to 'in' in the other interface. Notes ----- Assumes that the port identifiers in both interfaces are sorted in the same order. """ # Merge the interface data on their indices (i.e., their port identifiers): data_merged = self._merge_on_interfaces(a, i, b) # Check whether there are compatible subsets, i.e., at least one pair of # ports from the two interfaces that are compatible with each other: if allow_subsets: # If the interfaces share no identical port identifiers, they are # incompatible: if not len(data_merged): return False # Compatible identifiers must have the same non-null 'type' # attribute and their non-null 'io' attributes must be the inverse # of each other: if not data_merged.apply(lambda row: \ ((row['type_x'] == row['type_y']) or \ (pd.isnull(row['type_x']) and pd.isnull(row['type_y']))) and \ ((row['io_x'] == 'out' and row['io_y'] == 'in') or \ (row['io_x'] == 'in' and row['io_y'] == 'out') or \ (pd.isnull(row['io_x']) and pd.isnull(row['io_y']))), axis=1).any(): return False # Require that all ports in the two interfaces be compatible: else: # If one interface contains identifiers not in the other, they are # incompatible: if len(data_merged) < max(len(self.data[self.data['interface'] == a]), len(i.data[i.data['interface'] == b])): return False # Compatible identifiers must have the same non-null 'type' # attribute and their non-null 'io' attributes must be the inverse # of each other: if not data_merged.apply(lambda row: \ ((row['type_x'] == row['type_y']) or \ (pd.isnull(row['type_x']) and pd.isnull(row['type_y']))) and \ ((row['io_x'] == 'out' and row['io_y'] == 'in') or \ (row['io_x'] == 'in' and row['io_y'] == 'out') or \ (pd.isnull(row['io_x']) and pd.isnull(row['io_y']))), axis=1).all(): return False # All tests passed: return True def is_in_interfaces(self, s): """ Check whether ports comprised by a selector are in the stored interfaces. Parameters ---------- s : str or unicode Port selector. Returns ------- result : bool True if the comprised ports are in any of the stored interfaces. """ try: # Pad the expanded selector with blanks to prevent pandas from # spurious matches such as mistakenly validating '/foo' as being in # an Interface that only contains the ports '/foo[0:2]': idx = self.sel.expand(s, self.idx_levels) if not isinstance(self.data.index, pd.MultiIndex): idx = [x[0] for x in idx] d = self.data['interface'].ix[idx] if isinstance(d, int): return True if np.any(d.isnull().tolist()): return False else: return True except: return self.sel.is_in(s, self.index.tolist()) def out_ports(self, i=None, tuples=False): """ Restrict Interface ports to output ports. Parameters ---------- i : int Interface identifier. If None, return all output ports. tuples : bool If True, return a list of tuples; if False, return an Interface instance. Returns ------- interface : Interface or list of tuples Either an Interface instance containing all output ports and their attributes in the specified interface, or a list of tuples corresponding to the expanded ports. """ if i is None: try: df = self.data[self.data['io'] == 'out'] except: df = None else: try: df = self.data[(self.data['io'] == 'out') & \ (self.data['interface'] == i)] except: df = None if tuples: if df is None: return [] else: return df.index.tolist() else: if df is None: return Interface() else: return self.from_df(df) def port_select(self, f, inplace=False): """ Restrict Interface ports with a selection function. Returns an Interface instance containing only those rows whose ports are passed by the specified selection function. Parameters ---------- f : function Selection function with a single tuple argument containing the various columns of the Interface instance's MultiIndex. inplace : bool, default=False If True, update and return the given Interface instance. Otherwise, return a new instance. Returns ------- i : Interface Interface instance containing ports selected by `f`. """ assert callable(f) if inplace: self.data = self.data.select(f) return self else: return Interface.from_df(self.data.select(f)) def spike_ports(self, i=None, tuples=False): """ Restrict Interface ports to spiking ports. Parameters ---------- i : int Interface identifier. If None, return all spiking ports. tuples : bool If True, return a list of tuples; if False, return an Interface instance. Returns ------- interface : Interface or list of tuples Either an Interface instance containing all spiking ports and their attributes in the specified interface, or a list of tuples corresponding to the expanded ports. """ if i is None: try: df = self.data[self.data['type'] == 'spike'] except: df = None else: try: df = self.data[(self.data['type'] == 'spike') & \ (self.data['interface'] == i)] except: df = None if tuples: if df is None: return [] else: return df.index.tolist() else: if df is None: return Interface() else: return self.from_df(df) def to_selectors(self, i=None): """ Retrieve Interface's port identifiers as list of path-like selectors. Parameters ---------- i : int Interface identifier. If set to None, return all port identifiers. Returns ------- selectors : list of str List of selector strings corresponding to each port identifier. """ ids = self.to_tuples(i) result = [] for t in ids: selector = '' for s in t: if type(s) in [str, unicode]: selector += '/'+s else: selector += '[%s]' % s result.append(selector) return result def to_tuples(self, i=None): """ Retrieve Interface's port identifiers as list of tuples. Parameters ---------- i : int Interface identifier. If set to None, return all port identifiers. Returns ------- result : list of tuple List of token tuples corresponding to each port identifier. """ if i is None: if isinstance(self.index, pd.MultiIndex): return self.index.tolist() else: return [(t,) for t in self.index] try: if isinstance(self.index, pd.MultiIndex): return self.data[self.data['interface'] == i].index.tolist() else: return [(t,) for t in self.data[self.data['interface'] == i].index] except: return [] def which_int(self, s): """ Return the interface containing the identifiers comprised by a selector. Parameters ---------- selector : str or unicode Port selector. Returns ------- i : set Set of identifiers for interfaces that contain ports comprised by the selector. """ try: idx = self.sel.expand(s, self.idx_levels) if not isinstance(self.data.index, pd.MultiIndex): idx = [x[0] for x in idx] d = self.data['interface'].ix[idx] s = set(d) s.discard(np.nan) return s except: try: s = set(self[s, 'interface'].values.flatten()) # Ignore unset entries: s.discard(np.nan) return s except KeyError: return set() def __copy__(self): """ Make a copy of this object. """ return self.from_df(self.data) copy = __copy__ copy.__doc__ = __copy__.__doc__ def set_pm(self, t, pm): """ Set port mapper associated with a specific port type. Parameters ---------- t : str or unicode Port type. pm : neurokernel.plsel.BasePortMapper Port mapper to save. """ # Ensure that the ports in the specified port mapper are a subset of # those in the interface associated with the specified type: assert isinstance(pm, BasePortMapper) if not self.sel.is_in(pm.index.tolist(), self.pm[t].index.tolist()): raise ValueError('cannot set mapper using undefined selectors') self.pm[t] = pm.copy() def equals(self, other): """ Check whether this interface is equivalent to another interface. Parameters ---------- other : neurokernel.pattern.Interface Interface instance to compare to this Interface. Returns ------- result : bool True if the interfaces are identical. Notes ----- Interfaces containing the same rows in different orders are not regarded as equivalent. """ assert isinstance(other, Interface) return self.data.equals(other.data) def __len__(self): return self.data.__len__() def __repr__(self): return 'Interface\n---------\n'+self.data.__repr__()
[docs]class Pattern(object): """ Connectivity pattern linking sets of interface ports. This class represents connection mappings between interfaces comprising sets of ports. Ports are represented using path-like identifiers; the presence of a row linking the two identifiers in the class' internal index indicates the presence of a connection. A single data attribute ('conn') associated with defined connections is created by default. Specific attributes may be accessed by specifying their names after the port identifiers; if a nonexistent attribute is specified when a sequential value is assigned, a new column for that attribute is automatically created: :: p['/x[0]', '/y[0]', 'conn', 'x'] = [1, 'foo'] The direction of connections between ports in a class instance determines whether they are input or output ports. Ports may not both receive input or emit output. Patterns may contain fan-out connections, i.e., one source port connected to multiple destination ports, but not fan-in connections, i.e., multiple source ports connected to a single destination port. Examples -------- >>> p = Pattern('/x[0:3]','/y[0:4]') >>> p['/x[0]', '/y[0:2]'] = 1 >>> p['/y[2]', '/x[1]'] = 1 >>> p['/y[3]', '/x[2]'] = 1 Attributes ---------- data : pandas.DataFrame Connection attribute data. index : pandas.MultiIndex Index of connections. interface : Interface Interfaces containing port identifiers and attributes. Parameters ---------- sel0, sel1, ...: str, unicode, or sequence Selectors defining the sets of ports potentially connected by the pattern. These selectors must be disjoint, i.e., no identifier comprised by one selector may be in any other selector. columns : sequence of str Data column names. See Also -------- plsel.SelectorMethods """
[docs] def __init__(self, *selectors, **kwargs): columns = kwargs['columns'] if kwargs.has_key('columns') else ['conn'] self.sel = SelectorMethods() # Force sets of identifiers to be disjoint so that no identifier can # denote a port in more than one set: assert self.sel.are_disjoint(*selectors) # Collect all of the selectors: selector = [] for s in selectors: if isinstance(s, Selector) and len(s) != 0: selector.extend(s.expanded) elif type(s) in [str, unicode]: selector.extend(self.sel.parse(s)) elif np.iterable(s): selector.extend(s) else: raise ValueError('invalid selector type') # Create Interface instance containing the ports comprised by all of the # specified selectors: self.interface = Interface(selector) # Set the interface identifiers associated with each of the selectors # consecutively: for i, s in enumerate(selectors): self.interface[s, 'interface'] = i # Create a MultiIndex that can store mappings between identifiers in the # two interfaces: self.num_levels = {'from': self.interface.num_levels, 'to': self.interface.num_levels} names = ['from_%s' % i for i in xrange(self.num_levels['from'])]+ \ ['to_%s' %i for i in xrange(self.num_levels['to'])] levels = [[] for i in xrange(len(names))] labels = [[] for i in xrange(len(names))] idx = pd.MultiIndex(levels=levels, labels=labels, names=names) self.data = pd.DataFrame(index=idx, columns=columns, dtype=object)
@property def from_slice(self): """ Slice of pattern index row corresponding to source port(s). """ return slice(0, self.num_levels['from']) @property def to_slice(self): """ Slice of pattern index row corresponding to destination port(s). """ return slice(self.num_levels['from'], self.num_levels['from']+self.num_levels['to']) @property def index(self): """ Pattern index. """ return self.data.index @index.setter def index(self, i): self.data.index = i @property def interface_ids(self): """ Interface identifiers. """ return self.interface.interface_ids @classmethod def _create_from(cls, *selectors, **kwargs): """ Create a Pattern instance from the specified selectors. Parameters ---------- sel0, sel1, ...: str Selectors defining the sets of ports potentially connected by the pattern. These selectors must be disjoint, i.e., no identifier comprised by one selector may be in any other selector, and non-empty. from_sel, to_sel : str Selectors that describe the pattern's initial index. If specified, both selectors must be set. If no selectors are set, the index is initially empty. gpot_sel, spike_sel : str Selectors that describe the graded potential and spiking ports in a pattern's initial index. data : numpy.ndarray, dict, or pandas.DataFrame Data to load store in class instance. columns : sequence of str Data column names. comp_op : str Operator to use to combine selectors into single selector that comprises both the source and destination ports in a pattern. validate : bool If True, validate the index of the Pattern's DataFrame. Returns ------- result : Pattern Pattern instance. """ from_sel = kwargs['from_sel'] if kwargs.has_key('from_sel') else None to_sel = kwargs['to_sel'] if kwargs.has_key('to_sel') else None gpot_sel = kwargs['gpot_sel'] if kwargs.has_key('gpot_sel') else None spike_sel = kwargs['spike_sel'] if kwargs.has_key('spike_sel') else None data = kwargs['data'] if kwargs.has_key('data') else None columns = kwargs['columns'] if kwargs.has_key('columns') else ['conn'] comb_op = kwargs['comb_op'] if kwargs.has_key('comb_op') else '+' validate = kwargs['validate'] if kwargs.has_key('validate') else True # Create empty pattern: for s in selectors: if not len(s): raise ValueError('cannot create pattern with empty selector %s' % s) p = cls(*selectors, columns=columns) # Construct index from concatenated selectors if specified: names = p.data.index.names if (from_sel is None and to_sel is None): levels = [[] for i in xrange(len(names))] labels = [[] for i in xrange(len(names))] idx = pd.MultiIndex(levels=levels, labels=labels, names=names) elif isinstance(from_sel, Selector) and isinstance(to_sel, Selector): if comb_op == '.+': idx = p.sel.make_index(Selector.concat(from_sel, to_sel), names) elif comb_op == '+': idx = p.sel.make_index(Selector.prod(from_sel, to_sel), names) else: raise ValueError('incompatible selectors specified') else: idx = p.sel.make_index('(%s)%s(%s)' % (from_sel, comb_op, to_sel), names) if validate: p.__validate_index__(idx) # Replace the pattern's DataFrame: p.data = pd.DataFrame(data=data, index=idx, columns=columns, dtype=object) # Update the `io` attributes of the pattern's interfaces: p.interface[from_sel, 'io'] = 'in' p.interface[to_sel, 'io'] = 'out' # Update the `type` attributes of the pattern's interface: if gpot_sel is not None: p.interface[gpot_sel, 'type'] = 'gpot' if spike_sel is not None: p.interface[spike_sel, 'type'] = 'spike' return p def clear(self): """ Clear all connections in class instance. """ self.interface.clear() self.data.drop(self.data.index, inplace=True) @classmethod def from_df(cls, df_int, df_pat): """ Create a Pattern from properly formatted DataFrames. Parameters ---------- df_int : pandas.DataFrame DataFrame with a MultiIndex and data columns 'interface', 'io', and 'type' (additional columns may also be present) that describes the pattern's interfaces. The index's rows must correspond to individual port identifiers. df_pat : pandas.DataFrame DataFrame with a MultiIndex and a data column 'conn' (additional columns may also be present) that describes the connections between ports in the pattern's interfaces. The index's level names must be 'from_0'..'from_N', 'to_0'..'to_M', where N and M are the maximum number of levels in the pattern's two interfaces. """ # Create pattern with phony selectors: pat = cls('/foo[0]', '/bar[0]') # Check that the 'interface' column of the interface DataFrame is set: if any(df_int['interface'].isnull()): raise ValueError('interface column must be set') # Create interface: pat.interface = Interface.from_df(df_int) # The pattern DataFrame's index must contain at least two levels: assert isinstance(df_pat.index, pd.MultiIndex) # Check that pattern DataFrame index levels are named correctly, # i.e., from_0..from_N and to_0..to_N, where N is equal to # pat.interface.num_levels: num_levels = pat.interface.num_levels if df_pat.index.names != ['from_%i' % i for i in xrange(num_levels)]+\ ['to_%i' % i for i in xrange(num_levels)]: raise ValueError('incorrectly named pattern index levels') for t in df_pat.index.tolist(): from_t = t[0:num_levels] to_t = t[num_levels:2*num_levels] if from_t not in df_int.index or to_t not in df_int.index: raise ValueError('pattern DataFrame contains identifiers ' 'not in interface DataFrame') pat.data = df_pat.copy() return pat @classmethod def from_product(cls, *selectors, **kwargs): """ Create pattern from the product of identifiers comprised by two selectors. For example: :: p = Pattern.from_product('/foo[0:2]', '/bar[0:2]', from_sel='/foo[0:2]', to_sel='/bar[0:2]', data=1) results in a pattern with the following connections: :: '/foo[0]' -> '/bar[0]' '/foo[0]' -> '/bar[1]' '/foo[1]' -> '/bar[0]' '/foo[1]' -> '/bar[1]' Parameters ---------- sel0, sel1, ...: str Selectors defining the sets of ports potentially connected by the pattern. These selectors must be disjoint, i.e., no identifier comprised by one selector may be in any other selector. from_sel, to_sel : str Selectors that describe the pattern's initial index. If specified, both selectors must be set; the 'io' attribute of the ports comprised by these selectors is respectively set to 'out' and 'in'. If no selectors are set, the index is initially empty. gpot_sel, spike_sel : str Selectors that describe the graded potential and spiking ports in a pattern's initial index. If specified, the 'type' attribute of the ports comprised by these selectors is respectively set to 'gpot' and 'spike'. data : numpy.ndarray, dict, or pandas.DataFrame Data to load store in class instance. columns : sequence of str Data column names. validate : bool If True, validate the index of the Pattern's DataFrame. Returns ------- result : Pattern Pattern instance. """ from_sel = kwargs['from_sel'] if kwargs.has_key('from_sel') else None to_sel = kwargs['to_sel'] if kwargs.has_key('to_sel') else None gpot_sel = kwargs['gpot_sel'] if kwargs.has_key('gpot_sel') else None spike_sel = kwargs['spike_sel'] if kwargs.has_key('spike_sel') else None data = kwargs['data'] if kwargs.has_key('data') else None columns = kwargs['columns'] if kwargs.has_key('columns') else ['conn'] validate = kwargs['validate'] if kwargs.has_key('validate') else True return cls._create_from(*selectors, from_sel=from_sel, to_sel=to_sel, gpot_sel=gpot_sel, spike_sel=spike_sel, data=data, columns=columns, comb_op='+', validate=validate) def gpot_ports(self, i=None, tuples=False): return self.interface.gpot_ports(i, tuples) gpot_ports.__doc__ = Interface.gpot_ports.__doc__ def in_ports(self, i=None, tuples=False): return self.interface.in_ports(i, tuples) in_ports.__doc__ = Interface.in_ports.__doc__ def interface_ports(self, i=None, tuples=False): return self.interface.interface_ports(i, tuples) interface_ports.__doc__ = Interface.interface_ports.__doc__ def out_ports(self, i=None, tuples=False): return self.interface.out_ports(i, tuples) out_ports.__doc__ = Interface.out_ports.__doc__ def spike_ports(self, i=None, tuples=False): return self.interface.spike_ports(i, tuples) spike_ports.__doc__ = Interface.spike_ports.__doc__ def connected_ports(self, i=None, tuples=False): """ Return ports that are connected by the pattern. Parameters ---------- i : int Interface identifier. tuples : bool If True, return a list of tuples; if False, return an Interface instance. Returns ------- interface : Interface Either an Interface instance containing all connected ports and their attributes in the specified interface, or a list of tuples corresponding to the expanded ports. Notes ----- Returned ports are listed in lexicographic order. """ # Use sets to accumulate the expanded ports to avoid passing duplicates # to DataFrame.ix.__getitem__(): ports = set() for t in self.data.index: ports.add(t[0:self.num_levels['from']]) ports.add(t[self.num_levels['from']:self.num_levels['from']+self.num_levels['to']]) # Sort the expanded ports so that the results are returned in # lexicographic order: df = self.interface.data.ix[sorted(ports)] if i is None: if tuples: return df.index.tolist() else: return Interface.from_df(df) else: if tuples: return df[df['interface'] == i].index.tolist() else: return Interface.from_df(df[df['interface'] == i]) @classmethod def from_concat(cls, *selectors, **kwargs): """ Create pattern from the concatenation of identifers in two selectors. For example: :: p = Pattern.from_concat('/foo[0:2]', '/bar[0:2]', from_sel='/foo[0:2]', to_sel='/bar[0:2]', data=1) results in a pattern with the following connections: :: '/foo[0]' -> '/bar[0]' '/foo[1]' -> '/bar[1]' Parameters ---------- data : numpy.ndarray, dict, or pandas.DataFrame Data to load store in class instance. from_sel, to_sel : str Selectors that describe the pattern's initial index. If specified, both selectors must be set; the 'io' attribute of the ports comprised by these selectors is respectively set to 'out' and 'in'. If no selectors are set, the index is initially empty. gpot_sel, spike_sel : str Selectors that describe the graded potential and spiking ports in a pattern's initial index. If specified, the 'type' attribute of the ports comprised by these selectors is respectively set to 'gpot' and 'spike'. columns : sequence of str Data column names. validate : bool If True, validate the index of the Pattern's DataFrame. Returns ------- result : Pattern Pattern instance. """ from_sel = kwargs['from_sel'] if kwargs.has_key('from_sel') else None to_sel = kwargs['to_sel'] if kwargs.has_key('to_sel') else None gpot_sel = kwargs['gpot_sel'] if kwargs.has_key('gpot_sel') else None spike_sel = kwargs['spike_sel'] if kwargs.has_key('spike_sel') else None data = kwargs['data'] if kwargs.has_key('data') else None columns = kwargs['columns'] if kwargs.has_key('columns') else ['conn'] validate = kwargs['validate'] if kwargs.has_key('validate') else True return cls._create_from(*selectors, from_sel=from_sel, to_sel=to_sel, gpot_sel=gpot_sel, spike_sel=spike_sel, data=data, columns=columns, comb_op='.+', validate=validate) def __validate_index__(self, idx): """ Raise an exception if the specified index will result in an invalid pattern. """ # Prohibit duplicate connections: if idx.duplicated().any(): raise ValueError('Duplicate pattern entries detected.') # Prohibit fan-in connections (i.e., patterns whose index has duplicate # 'from' port identifiers): from_idx, to_idx = self.split_multiindex(idx, self.from_slice, self.to_slice) if to_idx.duplicated().any(): raise ValueError('Fan-in pattern entries detected.') # Prohibit ports that both receive input and send output: if set(from_idx).intersection(to_idx): raise ValueError('Ports cannot both receive input and send output.') def which_int(self, s): return self.interface.which_int(s) which_int.__doc__ = Interface.which_int.__doc__ def is_in_interfaces(self, selector): """ Check whether a selector is supported by any stored interface. """ return self.interface.is_in_interfaces(selector) if len(self.interface[selector]) > 0: return True else: return False def connected_port_pairs(self, as_str=False): """ Return connections as pairs of port identifiers. Parameters ---------- as_str : bool If True, return connections as a list of identifier string pairs. Otherwise, return them as pairs of token tuples. """ if as_str: return [(self.sel.tokens_to_str(row[self.from_slice]), self.sel.tokens_to_str(row[self.to_slice])) \ for row in self.data.index] else: return [(row[self.from_slice], row[self.to_slice]) \ for row in self.data.index] def __setitem__(self, key, value): # Must pass more than one argument to the [] operators: assert type(key) == tuple # Ensure that specified selectors refer to ports in the # pattern's interfaces: assert self.is_in_interfaces(key[0]) assert self.is_in_interfaces(key[1]) # Ensure that the ports are in different interfaces: assert self.which_int(key[0]) != self.which_int(key[1]) # Expand and pad the specified 'from' and 'to' selectors: key_0_exp = self.sel.expand(key[0], self.num_levels['from']) key_1_exp = self.sel.expand(key[1], self.num_levels['to']) # Concatenate the selectors: selector = tuple(tuple(j for j in itertools.chain(*i)) \ for i in itertools.product(key_0_exp, key_1_exp)) # Try using the selector to select data from the internal DataFrame: try: idx = self.sel.get_index(self.data, selector, names=self.data.index.names) # If the select fails, try to create new rows with the index specified # by the selector and load them with the specified data: except: try: idx = self.sel.make_index(selector, self.data.index.names) except: raise ValueError('cannot create new rows for ambiguous selector %s' % selector) else: found = False else: found = True # Ensure that data to set is in dict form: if len(key) > 2: if np.isscalar(value): data = {k:value for k in key[2:]} elif type(value) == dict: data = value elif np.iterable(value) and len(value) <= len(key[2:]): data={k:v for k, v in zip(key[2:], value)} else: raise ValueError('cannot assign specified value') else: if np.isscalar(value): data = {self.data.columns[0]: value} elif type(value) == dict: data = value elif np.iterable(value) and len(value) <= len(self.data.columns): data={k:v for k, v in zip(self.data.columns, value)} else: raise ValueError('cannot assign specified value') # If the specified selectors correspond to existing entries, # set their attributes: if found: for k, v in data.iteritems(): self.data[k].ix[idx] = v # Otherwise, populate a new DataFrame with the specified attributes: else: new_data = self.data.append(pd.DataFrame(data=data, index=idx, dtype=object)) # Validate updated DataFrame's index before updating the instance's # data attribute: self.__validate_index__(new_data.index) self.data = new_data self.data.sort_index(inplace=True) # Update the `io` attributes of the pattern's interfaces: self.interface[key[0], 'io'] = 'in' self.interface[key[1], 'io'] = 'out' def __getitem__(self, key): assert len(key) >= 2 sel_0 = self.sel.expand(key[0]) sel_1 = self.sel.expand(key[1]) selector = [f+t for f, t in itertools.product(sel_0, sel_1)] if len(key) > 2: return self.sel.select(self.data[list(key[2:])], selector=selector) else: return self.sel.select(self.data, selector=selector) def src_idx(self, src_int, dest_int, src_type=None, dest_type=None, dest_ports=None, duplicates=False): """ Retrieve source ports connected to the specified destination ports. Examples -------- >>> p = Pattern('/foo[0:4]', '/bar[0:4]') >>> p['/foo[0]', '/bar[0]'] = 1 >>> p['/foo[1]', '/bar[1]'] = 1 >>> p['/foo[2]', '/bar[2]'] = 1 >>> p['/bar[3]', '/foo[3]'] = 1 >>> all(p.src_idx(0, 1, dest_ports='/bar[0,1]') == [('foo', 0), ('foo', 1)]) True Parameters ---------- src_int, dest_int : int Source and destination interface identifiers. src_type, dest_type : str Types of source and destination ports as listed in their respective interfaces. dest_ports : str Path-like selector corresponding to ports in destination interface. If not specified, all ports in the destination interface are considered. duplicates : bool If True, include duplicate ports in output. Returns ------- idx : list of tuple Source ports connected to the specified destination ports. """ assert src_int != dest_int assert src_int in self.interface.interface_ids and \ dest_int in self.interface.interface_ids # Filter destination ports by specified type: if dest_type is None: to_int = self.interface.interface_ports(dest_int) else: to_f = lambda x: x['type'] == dest_type to_int = self.interface.interface_ports(dest_int).data_select(to_f) # Filter destination ports by specified ports: if dest_ports is None: to_idx = to_int.index else: to_idx = to_int[dest_ports].index # Filter source ports by specified type: if src_type is None: from_int = self.interface.interface_ports(src_int) else: from_f = lambda x: x['type'] == src_type from_int = self.interface.interface_ports(src_int).data_select(from_f) from_idx = from_int.index # Construct index from those rows in the pattern whose ports have been # selected by the above code: if isinstance(from_idx, pd.MultiIndex): if isinstance(to_idx, pd.MultiIndex): f = lambda x: x[self.from_slice] in from_idx and x[self.to_slice] in to_idx else: f = lambda x: x[self.from_slice] in from_idx and x[self.to_slice][0] in to_idx else: if isinstance(to_idx, pd.MultiIndex): f = lambda x: x[self.from_slice][0] in from_idx and x[self.to_slice] in to_idx else: f = lambda x: x[self.from_slice][0] in from_idx and x[self.to_slice][0] in to_idx idx = self.data.select(f).index if not duplicates: # Remove duplicate tuples from output without perturbing the order # of the remaining tuples: return OrderedDict.fromkeys([x[self.from_slice] for x in idx]).keys() else: return [x[self.from_slice] for x in idx] def dest_idx(self, src_int, dest_int, src_type=None, dest_type=None, src_ports=None): """ Retrieve destination ports connected to the specified source ports. Examples -------- >>> p = Pattern('/foo[0:4]', '/bar[0:4]') >>> p['/foo[0]', '/bar[0]'] = 1 >>> p['/foo[1]', '/bar[1]'] = 1 >>> p['/foo[2]', '/bar[2]'] = 1 >>> p['/bar[3]', '/foo[3]'] = 1 >>> all(p.dest_idx(0, 1, src_ports='/foo[0,1]') == [('bar', 0), ('bar', 1)]) True Parameters ---------- src_int, dest_int : int Source and destination interface identifiers. src_type, dest_type : str Types of source and destination ports as listed in their respective interfaces. src_ports : str Path-like selector corresponding to ports in source interface. If not specified, all ports in the source interface are considered. Returns ------- idx : list of tuple Destination ports connected to the specified source ports. Notes ----- No `duplicates` parameter is provided because fan-in from multiple source ports to a single destination port is not permitted. """ assert src_int != dest_int assert src_int in self.interface.interface_ids and \ dest_int in self.interface.interface_ids # Filter source ports by specified type: if src_type is None: from_int = self.interface.interface_ports(src_int) else: from_f = lambda x: x['type'] == src_type from_int = self.interface.interface_ports(src_int).data_select(from_f) # Filter source ports by specified ports: if src_ports is None: from_idx = from_int.index else: from_idx = from_int[src_ports].index # Filter destination ports by specified type: if dest_type is None: to_int = self.interface.interface_ports(dest_int) else: to_f = lambda x: x['type'] == dest_type to_int = self.interface.interface_ports(dest_int).data_select(to_f) to_idx = to_int.index # Construct index from those rows in the pattern whose ports have been # selected by the above code: if isinstance(from_idx, pd.MultiIndex): if isinstance(to_idx, pd.MultiIndex): f = lambda x: x[self.from_slice] in from_idx and x[self.to_slice] in to_idx else: f = lambda x: x[self.from_slice] in from_idx and x[self.to_slice][0] in to_idx else: if isinstance(to_idx, pd.MultiIndex): f = lambda x: x[self.from_slice][0] in from_idx and x[self.to_slice] in to_idx else: f = lambda x: x[self.from_slice][0] in from_idx and x[self.to_slice][0] in to_idx idx = self.data.select(f).index # Remove duplicate tuples from output without perturbing the order # of the remaining tuples: return OrderedDict.fromkeys([x[self.to_slice] for x in idx]).keys() def __len__(self): return self.data.__len__() def __repr__(self): return 'Pattern\n-------\n'+self.data.__repr__() def is_connected(self, from_int, to_int): """ Check whether the specified interfaces are connected. Parameters ---------- from_int, to_int : int Interface identifiers; must be in `self.interface.keys()`. Returns ------- result : bool True if at least one connection from a port identifier in interface `from_int` to a port identifier in interface `to_int` exists. """ assert from_int != to_int assert from_int in self.interface.interface_ids assert to_int in self.interface.interface_ids # Get indices of the 'from' and 'to' interfaces as lists to speed up the # check below [*]: from_idx = set(self.interface.data[self.interface.data['interface'] == from_int].index.tolist()) to_idx = set(self.interface.data[self.interface.data['interface'] == to_int].index.tolist()) # Get index of all defined connections: idx = self.data[self.data['conn'] != 0].index for t in idx: # Split tuple into 'from' and 'to' identifiers; since the interface # index for a 'from' or 'to' identifier is an Index rather than a # MultiIndex, we need to extract a scalar rather than a tuple in the # former case: if self.num_levels['from'] == 1: from_id = t[0] else: from_id = t[0:self.num_levels['from']] if self.num_levels['to'] == 1: to_id = t[self.num_levels['from']] else: to_id = t[self.num_levels['from']:self.num_levels['from']+self.num_levels['to']] # Check whether port identifiers are in the interface indices [*]: if from_id in from_idx and to_id in to_idx: return True return False def from_csv(self, file_name, **kwargs): """ Read connectivity data from CSV file. Given N 'from' levels and M 'to' levels in the internal index, the method assumes that the first N+M columns in the file specify the index levels. See Also -------- pandas.read_csv """ # XXX this should refuse to load identifiers that are not in any of the # sets of ports comprised by the pattern: data_names = self.data.columns index_names = self.data.index.names kwargs['names'] = data_names kwargs['index_col'] = range(len(index_names)) data = pd.read_csv(file_name, **kwargs) self.data = data # Restore MultiIndex level names: self.data.index.names = index_names @classmethod def from_graph(cls, g): """Convert a NetworkX directed graph into a Pattern instance. Parameters ---------- g : networkx.DiGraph Graph to convert. The node identifiers must be port identifiers. Returns ------- p : Pattern Pattern instance. Notes ----- The nodes in the specified graph must contain an 'interface' attribute. Port attributes other than 'interface', 'io', and 'type' are not stored in the created Pattern instance's interface. """ assert type(g) == nx.DiGraph # Group port identifiers by interface number and whether the ports are # graded potential, or spiking: ports_by_int = {} ports_gpot = [] ports_spike = [] ports_from = [] ports_to = [] for n, data in g.nodes(data=True): assert SelectorMethods.is_identifier(n) assert data.has_key('interface') if not ports_by_int.has_key(data['interface']): ports_by_int[data['interface']] = [] ports_by_int[data['interface']].append(n) if data.has_key('type'): if data['type'] == 'gpot': ports_gpot.append(n) elif data['type'] == 'spike': ports_spike.append(n) # Use connection direction to determine whether ports are source or # destination (XXX should this check whether the io attributes are # consistent with the connection directions?): for f, t in g.edges(): ports_from.append(f) ports_to.append(t) # Create selectors for each interface number: selector_list = [] for interface in sorted(ports_by_int.keys()): selector_list.append(','.join(ports_by_int[interface])) p = cls.from_concat(*selector_list, from_sel=','.join(ports_from), to_sel=','.join(ports_to), gpot_sel=','.join(ports_gpot), spike_sel=','.join(ports_spike), data=1) p.data.sort_index(inplace=True) p.interface.data.sort_index(inplace=True) return p @classmethod def split_multiindex(cls, idx, a, b): """ Split a single MultiIndex into two instances. Parameters ---------- idx : pandas.MultiIndex MultiIndex to split. a, b : slice Ranges of index columns to include in the two resulting instances. Returns ------- idx_a, idx_b : pandas.MultiIndex Resulting MultiIndex instances. """ t_list = idx.tolist() idx_a = pd.MultiIndex.from_tuples([t[a] for t in t_list]) idx_b = pd.MultiIndex.from_tuples([t[b] for t in t_list]) return idx_a, idx_b def to_graph(self): """ Convert the pattern to a networkx directed graph. Returns ------- g : networkx.DiGraph Graph whose nodes are the pattern's ports and whose edges are the pattern's connections. Notes ----- The 'conn' attribute of the connections is not transferred to the graph edges. This method relies upon the assumption that the sets of port identifiers comprised by the pattern's interfaces are disjoint. """ g = nx.DiGraph() # Add all of the ports as nodes: for t in self.interface.data.index: if not isinstance(self.interface.data.index, pd.MultiIndex): t = (t,) id = self.sel.tokens_to_str(t) # Replace NaNs with empty strings: d = {k: (v if str(v) != 'nan' else '') \ for k, v in self.interface.data.ix[t].to_dict().iteritems()} # Each node's name corresponds to the port identifier string: g.add_node(id, d) # Add all of the connections as edges: for t in self.data.index: t_from = t[self.from_slice] t_to = t[self.to_slice] id_from = self.sel.tokens_to_str(t_from) id_to = self.sel.tokens_to_str(t_to) d = self.data.ix[t].to_dict() # Discard the 'conn' attribute because the existence of the edge # indicates that the connection exists: if d.has_key('conn'): d.pop('conn') g.add_edge(id_from, id_to, d) return g
def are_compatible(sel_in_0, sel_out_0, sel_spike_0, sel_gpot_0, sel_in_1, sel_out_1, sel_spike_1, sel_gpot_1, allow_subsets=False): """ Check whether two interfaces specified as selectors can be connected. Parameters ---------- sel_in_0, sel_out_0, sel_spike_0, sel_gpot_0 : Selector, str, unicode Input, output, spiking, and graded potential ports in first interface. sel_in_0, sel_out_0, sel_spike_0, sel_gpot_0 : Selector, str, unicode Input, output, spiking, and graded potential ports in second interface. allow_subsets : bool If True, interfaces that contain a compatible subset of ports are deemed to be compatible; otherwise, all ports in the two interfaces must be compatible. Results ------- result : bool True if interfaces are compatible, False otherwise. """ sel_in_0 = Selector(sel_in_0) sel_out_0 = Selector(sel_out_0) sel_spike_0 = Selector(sel_spike_0) sel_gpot_0 = Selector(sel_gpot_0) sel_0 = Selector.union(sel_in_0, sel_out_0, sel_spike_0, sel_gpot_0) sel_in_1 = Selector(sel_in_1) sel_out_1 = Selector(sel_out_1) sel_spike_1 = Selector(sel_spike_1) sel_gpot_1 = Selector(sel_gpot_1) sel_1 = Selector.union(sel_in_1, sel_out_1, sel_spike_1, sel_gpot_1) int_0 = Interface.from_selectors(sel_0, sel_in_0, sel_out_0, sel_spike_0, sel_gpot_0, sel_0) int_1 = Interface.from_selectors(sel_1, sel_in_1, sel_out_1, sel_spike_1, sel_gpot_1, sel_1) return int_0.is_compatible(0, int_1, 0)