Source code for neurokernel.plsel

#!/usr/bin/env python

"""
Path-like row selector for pandas DataFrames with hierarchical MultiIndexes.
"""

import copy
import itertools
import re
import sys

import numpy as np
import pandas as pd
import ply.lex as lex
import ply.yacc as yacc

# Work around lack of support for serializing slices in msgpack 0.4.4:
def _encode(obj):
    if isinstance(obj, slice):
        return {'type': 'slice',
                'data': (obj.start, obj.stop, obj.step)}
    else:
        return obj

def _decode(obj):
    try:
        if obj['type'] == 'slice':
            return slice(*obj['data'])
        else:
            return obj
    except:
        return obj

# Fall back to using pickle for hashing if msgpack isn't available:
try:
    import msgpack
except ImportError:
    if sys.version_info.major == 2:
        import cPickle as pickle
    else:
        import pickle
    _packb = pickle.dumps
    _unpackb = pickle.loads
else:
    _packb = lambda x: msgpack.packb(x, default=_encode)
    _unpackb = lambda x: msgpack.unpackb(x, object_hook=_decode)

[docs]class Selector(object): """ Validated and expanded port selector. Parameters ---------- s : Selector, str, or unicode Existing Selector class instance or string representation. The selector may not be ambiguous. If an existing Selector instance is specified, the new instance is a copy of the existing instance. Attributes ---------- str : str String representation of selector. expanded : tuple of tuples Expanded selector. max_levels : int Maximum number of levels in selector. """
[docs] def __init__(self, s): if isinstance(s, Selector): self._expanded = copy.copy(s._expanded) self._max_levels = copy.copy(s._max_levels) elif isinstance(s, basestring): # python2 dependency # Save expanded selector as tuple because it shouldn't need to be # modified after expansion: self._expanded = tuple(SelectorMethods.expand(s)) self._max_levels = max(map(len, self._expanded)) else: self._expanded = tuple(SelectorMethods.expand(s)) self._max_levels = max(map(len, self._expanded))
@property def nonempty(self): """ True if the selector contains identifiers. """ return bool(self._max_levels) @property def str(self): """ String representation of selector. """ return SelectorMethods.collapse(self._expanded) @property def expanded(self): """ Expanded selector. """ return self._expanded @property def identifiers(self): """ List of individual identifiers in selector. """ return [SelectorMethods.collapse((i,)) for i in self._expanded] @property def max_levels(self): """ Maximum number of levels in selector. """ return self._max_levels @classmethod def add(cls, *sels): """ Combine the identifiers in multiple selectors into a single selector. Parameters ---------- sels : iterable of Selector Selector instances. Returns ------- result : Selector Selector containing all of the port identifiers comprised by all of the arguments. Notes ----- Duplicate identifiers are not omitted. """ out = cls('') try: out._max_levels = max([s.max_levels for s in sels if s.nonempty]) except ValueError: out._max_levels = 0 out._expanded = tuple(i for s in sels \ for i in s._expanded if s.nonempty) or ((),) return out @classmethod def add_str(cls, *s): """ Combine the identifiers in multiple selector strings into a single selector. """ return cls.add(*map(cls, s)) @classmethod def concat(cls, *sels): """ Concatenate the identifiers in multiple selectors elementwise. Parameters ---------- sels : iterable of Selector Selector instances. Returns ------- result : Selector Each port identifier in the returned Selector is equivalent to the elementwise concatenation of the identifiers in the listed Selector instances. """ out = cls('') s_len = None e_list = [] for s in sels: if s_len is None: s_len = len(s) else: assert len(s) == s_len if not e_list: e_list = list(list(t) for t in s._expanded) else: for e, t in zip(e_list, s._expanded): e.extend(t) out._expanded = tuple(tuple(e) for e in e_list) out._max_levels = sum([s.max_levels for s in sels if s.nonempty]) return out @classmethod def prod(cls, *sels): """ Compute the product of identifiers in multiple selectors. Parameters ---------- sels : iterable of Selector Selector instances. Returns ------- result : Selector Selector containing all of the port identifiers comprised by the product of all of the arguments. Notes ----- Duplicate identifiers are not omitted. """ out = cls('') out._max_levels = sum([s.max_levels for s in sels if s.nonempty]) out._expanded = tuple(tuple(j for j in itertools.chain(*i)) \ for i in itertools.product(*[s.expanded for s in sels])) return out @classmethod def union(cls, *sels): """ Compute the union of the identifiers in multiple selectors. Parameters ---------- sels : iterable of Selector Selector instances. Returns ------- result : Selector Selector containing all of the port identifiers comprised by the union of all of the arguments. """ out = cls('') tmp = set() for s in sels: if s.nonempty: tmp = tmp.union(s.expanded) if tmp: out._expanded = tuple(sorted(tmp)) else: out._expanded = ((),) try: out._max_levels = max([s.max_levels for s in sels if s.nonempty]) except ValueError: out._max_levels = 0 return out def __add__(self, y): return self.add(self, y) def __len__(self): if len(self._expanded) == 1 and not self._expanded[0]: return 0 else: return len(self._expanded) def __iter__(self): if self.nonempty: for t in self._expanded: yield (t,) else: yield ((),) def __repr__(self): s = self.str if len(s) <= 100: return 'Selector(\'%s\')' % s else: return 'Selector(\'%s\')' % (s[0:25]+' ... '+s[-25:])
[docs]class SelectorParser(object): """ This class implements a parser for path-like selectors that can be associated with elements in a sequential data structure such as a Pandas DataFrame; in the latter case, each level of the selector corresponds to a level of a Pandas MultiIndex. An index level may either be a denoted by a string label (e.g., 'foo') or a numerical index (e.g., 0, 1, 2); a selector level may additionally be a list of strings (e.g., '[foo,bar]') or integers (e.g., '[0,2,4]') or continuous intervals (e.g., '[0:5]'). The '*' symbol matches any value in a level, while a range with an open upper bound (e.g., '[5:]') will match all integers greater than or equal to the lower bound. Examples of valid selectors include ================== ================================= Selector Comments ================== ================================= /foo/bar /foo+/bar equivalent to /foo/bar /foo/[qux,bar] /foo/bar[0] /foo/bar/[0] equivalent to /foo/bar[0] /foo/bar/0 equivalent to /foo/bar[0] /foo/bar[0,1] /foo/bar[0:5] /foo/*/baz /foo/*/baz[5] /foo/bar,/baz/qux (/foo,/bar)+/baz equivalent to /foo/baz,/bar/baz /[foo,bar].+/[0:2] equivalent to /foo[0],/bar[1] ================== ================================= Notes ----- An empty string is deemed to be a valid selector. Since there is no need to maintain multiple instances of the lexer/parser used to process path-like selectors, they are associated with the class rather than class instances; likewise, all of the class' methods are classmethods. Numerical indices in selectors are assumed to be zero-based. Intervals do not include the end element (i.e., like numpy, not like Pandas). """ tokens = ('ASTERISK', 'COMMA', 'DOTPLUS', 'INTEGER', 'INTEGER_SET', 'INTERVAL', 'LPAREN', 'PLUS', 'RPAREN', 'STRING', 'STRING_SET') @classmethod def _parse_interval_str(cls, s): """ Convert string representation of interval to slice. """ start, stop = s.split(':') if start == '': start = 0 else: start = int(start) if stop == '': stop = None else: stop = int(stop) return slice(start, stop) @classmethod def t_PLUS(cls, t): r'\+' return t @classmethod def t_DOTPLUS(cls, t): r'\.\+' return t @classmethod def t_COMMA(cls, t): r'\,' return t @classmethod def t_LPAREN(cls, t): r'\(' return t @classmethod def t_RPAREN(cls, t): r'\)' return t @classmethod def t_ASTERISK(cls, t): r'/\*' t.value = t.value.strip('/') return t @classmethod def t_INTEGER(cls, t): r'/?\d+' t.value = int(t.value.strip('/')) return t @classmethod def t_INTEGER_SET(cls, t): r'/?\[(?:\d+,?)+\]' t.value = map(int, t.value.strip('/[]').split(',')) return t @classmethod def t_INTERVAL(cls, t): r'/?\[\d*\:\d*\]' t.value = cls._parse_interval_str(re.search('\[(.+)\]', t.value).group(1)) return t @classmethod def t_STRING(cls, t): r'/[^*/\[\]\(\):,\.\d][^+*/\[\]\(\):,\.]*' t.value = t.value.strip('/') return t @classmethod def t_STRING_SET(cls, t): r'/?\[(?:[^+*/\[\]\(\):,\.\d][^+*/\[\]\(\):,\.]*,?)+\]' t.value = t.value.strip('/[]').split(',') return t @classmethod def t_error(cls, t): raise ValueError('Cannot tokenize selector - illegal character: %s' % t.value[0]) # A selector is a list of lists of levels: @classmethod def p_selector_paren_selector(cls, p): 'selector : LPAREN selector RPAREN' p[0] = p[2] @classmethod def p_selector_comma_selector(cls, p): 'selector : selector COMMA selector' p[0] = p[1]+p[3] @classmethod def p_selector_plus_selector(cls, p): 'selector : selector PLUS selector' p[0] = [a+b for a, b in itertools.product(p[1], p[3])] @classmethod def p_selector_dotplus_selector(cls, p): 'selector : selector DOTPLUS selector' # Expand ranges and wrap strings with lists in each selector: for i in xrange(len(p[1])): for j in xrange(len(p[1][i])): if type(p[1][i][j]) in [int, str, unicode]: p[1][i][j] = [p[1][i][j]] elif type(p[1][i][j]) == slice: p[1][i][j] = range(p[1][i][j].start, p[1][i][j].stop) for i in xrange(len(p[3])): for j in xrange(len(p[3][i])): if type(p[3][i][j]) in [int, str, unicode]: p[3][i][j] = [p[3][i][j]] elif type(p[3][i][j]) == slice: p[3][i][j] = range(p[3][i][j].start, p[3][i][j].stop) # Fully expand both selectors into individual identifiers ids_1 = [list(x) for y in p[1] for x in itertools.product(*y)] ids_3 = [list(x) for y in p[3] for x in itertools.product(*y)] # The expanded selectors must comprise the same number of identifiers: assert len(ids_1) == len(ids_3) p[0] = [a+b for (a, b) in zip(ids_1, ids_3)] @classmethod def p_selector_selector_plus_level(cls, p): 'selector : selector PLUS level' p[0] = [x+[p[3]] for x in p[1]] @classmethod def p_selector_selector_level(cls, p): 'selector : selector level' p[0] = [x+[p[2]] for x in p[1]] @classmethod def p_selector_level(cls, p): 'selector : level' p[0] = [[p[1]]] @classmethod def p_level(cls, p): '''level : ASTERISK | INTEGER | INTEGER_SET | INTERVAL | STRING | STRING_SET''' p[0] = p[1] @classmethod def p_error(cls, p): raise ValueError('Cannot parse selector - syntax error: %s' % p) @classmethod def tokenize(cls, selector): """ Tokenize a selector string. Parameters ---------- selector : str Selector string. Returns ------- token_list : list List of tokens extracted by ply. """ cls.lexer.input(selector) token_list = [] while True: token = cls.lexer.token() if not token: break token_list.append(token) return token_list @classmethod def pad_parsed(cls, selector, pad_len=float('inf'), inplace=True): """ Pad token lists in a parsed selector to some maximum length. Parameters ---------- selector : list of lists Parsed selector. pad_len : int Final length of each token list. If set to Inf, all tokens are padded to the maximum token length. inplace : bool If True, modify the selector in place; otherwise, return a modified copy. Returns ------- result : list of lists Padded selector. """ assert isinstance(selector, list) if pad_len == float('inf'): pad_len = max(map(len, selector)) if not inplace: selector = copy.deepcopy(selector) for x in selector: x += ['']*(pad_len-len(x)) return selector @classmethod def parse(cls, selector, pad_len=0): """ Parse a selector string into tokens. Parameters ---------- selector : str Selector string. pad_len : int Length to which expanded token sequences should be padded with blanks. If infinite, the sequences are padded to the length of the longest sequence. Returns ------- result : list of list List of lists containing the tokens corresponding to each individual selector in the string. Notes ----- This method does not expand selectors into the tokens corresponding to individual port identifiers. See Also -------- SelectorMethods.expand """ if re.search('^\s*$', selector): result = [[]] else: result = cls.parser.parse(selector, lexer=cls.lexer) return cls.pad_parsed(result, pad_len)
[docs]class SelectorMethods(SelectorParser): """ Class for manipulating and using path-like selectors. Contains class methods for expanding selectors, selecting rows from a Pandas DataFrame using a selector, etc. The class can also be used to create new MultiIndex instances from selectors that can be fully expanded into an explicit set of identifiers (and therefore contain no ambiguous symbols such as '*' or '[:]'). """ @classmethod def is_identifier(cls, s): """ Check whether a selector or token sequence can identify a single port. Parameters ---------- s : Selector, str, unicode, or sequence Selector class instance, raw selector string (e.g., '/foo[0:2]'), sequence of token sequences (e.g., [['foo', (0, 2)]]), or sequence of tokens (e.g., ['foo', 0]). Returns ------- result : bool True for a sequence containing only strings and/or integers (e.g., ['foo', 0]) or a selector string that expands into a single sequence of strings and/or integers (e.g., [['foo', 0]]). Notes ----- Can check sequences of tokens (even though a sequence of tokens is not a valid selector). """ if isinstance(s, Selector): return len(s) == 1 if np.iterable(s): # Try to expand string: if type(s) in [str, unicode]: try: s_exp = cls.expand(s) except: return False else: if len(s_exp) == 1: return True else: return False # If all entries are lists or tuples, try to expand: elif all([(type(x) in [list, slice]) for x in s]): if len(cls.expand(s)) == 1: return True else: return False # A sequence of integers and/or strings is a valid port identifier: elif set(map(type, s)).issubset([int, str, unicode]): return True else: return False # A non-iterable cannot be a valid identifier: else: return False @classmethod def to_identifier(cls, s): """ Convert an expanded selector/token sequence into a single port identifier string. Parameters ---------- s : sequence Expanded selector (i.e., a sequence of sequences) or a sequence of string or integer tokens. Returns ------- s : str Port identifier string. Notes ----- Accepts sequences of tokens as well as expanded selectors (even though a sequence of tokens is not a valid selector). """ assert type(s) in [list, tuple] if set(map(type, s)).issubset([int, long, str, unicode]): tokens = s else: assert len(s) == 1 tokens = s[0] result = '' for t in tokens: if type(t) == str: result += '/'+t elif type(t) in [int, long]: result += '[%s]' % t else: raise ValueError('Cannot convert to single port identifier.') return result @classmethod def is_ambiguous(cls, selector): """ Check whether a selector cannot be expanded into an explicit list of identifiers. A selector is ambiguous if it contains the symbols '*' or ':]' (i.e., a range with no upper bound). Parameters ---------- selector : Selector, str, unicode or sequence Selector class instance, selector string (e.g., '/foo[0:2]'), or sequence of token sequences (e.g., [['foo', (0, 2)]]). Returns ------- result : bool True if the selector is ambiguous, False otherwise. """ # The Selector class can only encapsulate an unambiguous selector: if isinstance(selector, Selector): return False if type(selector) in [str, unicode]: if re.search(r'(?:\*)|(?:\:\])', selector): return True else: return False elif type(selector) in [list, tuple]: for tokens in selector: for token in tokens: if token == '*' or \ (type(token) == slice and token.stop is None): return True return False else: raise ValueError('invalid selector type') @classmethod def is_selector_empty(cls, selector): """ Check whether a string or sequence is an empty selector. Parameters ---------- s : str, unicode, or sequence String or sequence to test. Returns ------- result : bool True if `s` is a sequence containing empty sequences or a null string, False otherwise. Notes ----- Ambiguous selectors are not deemed to be empty. """ if isinstance(selector, Selector): return len(selector) == 0 if type(selector) in [str, unicode] and \ re.search('^\s*$', selector): return True if type(selector) in [list, tuple] and \ all([len(x) == 0 for x in selector]): return True return False @classmethod def is_selector_seq(cls, s): """ Check whether a sequence is a valid selector. Parameters ---------- s : sequence Sequence to test. Returns ------- result : bool True if a sequence of valid token sequences (e.g., [['foo', (0, 2)]], [['bar', 'baz'], ['qux', 0]]), False otherwise. Note ---- An empty sequence (e.g., []) is deemed to be a valid selector. """ assert np.iterable(s) for tokens in s: # The selector must contain sequences of tokens: if not np.iterable(tokens): return False # Each token must either be a string, integer, slice, # list of strings, or list of integers: for token in tokens: if type(token) == list: token_types = set(map(type, token)) if not (token_types.issubset([str, unicode, int, long])): return False elif type(token) not in [slice, str, unicode, int, long]: return False # All tokens are valid: return True @classmethod def is_selector_str(cls, s): """ Check whether a string is a valid selector. Parameters ---------- s : str, unicode String to test. Returns ------- result : bool True if the specified selector is a parseable string (e.g., '/foo[0:2]'), False otherwise. """ assert type(s) in [str, unicode] try: cls.parse(s) except: return False else: return True @classmethod def is_selector(cls, s): """ Check whether a string or sequence is a valid selector. Parameters ---------- s : Selector, str, unicode, or sequence Selector instance, string, or sequence to test. Returns ------- result : bool True if the specified selector is a parseable string (e.g., '/foo[0:2]') or a sequence of valid token sequences. (e.g., [['foo', (0, 2)]], [['bar', 'baz'], ['qux', 0]]). """ if isinstance(s, Selector): return True elif type(s) in [str, unicode]: return cls.is_selector_str(s) elif np.iterable(s): return cls.is_selector_seq(s) else: return False @classmethod def expand(cls, selector, pad_len=0): """ Expand an unambiguous selector into a list of identifiers. Parameters ---------- selector : Selector, str, unicode, or sequence Selector class instance, string (e.g., '/foo[0:2]'), or sequence of token sequences (e.g., [['foo', (0, 2)]]). pad_len : int Length to which expanded token sequences should be padded with blanks. If infinite, the sequences are padded to the length of the longest sequence. Returns ------- result : list List of identifiers. If the number of levels in the selector is 1, each is a string or integer token; otherwise, each identifier is a tuple of identifier is a tuple of tokens. Examples -------- >>> from neurokernel.plsel import SelectorMethods >>> SelectorMethods.expand('/foo[0:2]') [('foo', 0), ('foo', 1)] >>> SelectorMethods.expand('/foo,/bar[0:2]', float('inf')) [('foo', ''), ('bar', 0), ('bar', 1)] >>> SelectorMethods.expand('/foo[0:2]', 3) [('foo', 0, ''), ('foo', 1, '')] >>> SelectorMethods.expand('/bar,/foo[0:2]', 3) [('bar', '', ''), ('foo', 0, ''), ('foo', 1, '')] """ if isinstance(selector, Selector): if pad_len == 0: return selector.expanded elif pad_len == float('inf'): return [tuple(x)+('',)*(selector.max_levels-len(x)) \ for x in selector.expanded] else: return [tuple(x)+('',)*(pad_len-len(x)) \ for x in selector.expanded] assert cls.is_selector(selector) assert not cls.is_ambiguous(selector) if type(selector) in [str, unicode]: p = cls.parse(selector) elif np.iterable(selector): # Assume empty iterables are empty selectors: if not selector: selector = [()] # Copy the selector to avoid modifying it: p = copy.copy(selector) else: raise ValueError('invalid selector type') max_levels = 0 temp = [] for i in xrange(len(p)): t = list(p[i]) len_p = len(p[i]) max_levels = max(max_levels, len_p) for j in xrange(len_p): # Wrap integers and strings in a list so that # itertools.product() can iterate over them: if type(t[j]) in [int, long, str, unicode]: t[j] = [t[j]] # Expand slices into ranges: elif type(t[j]) == slice: t[j] = range(t[j].start, t[j].stop) temp.append(t) if pad_len == float('inf'): result = [tuple(x)+('',)*(max_levels-len(x)) \ for y in temp for x in itertools.product(*y)] else: result = [tuple(x)+('',)*(pad_len-len(x)) \ for y in temp for x in itertools.product(*y)] # If the selector doesn't expand to anything, return a list containing # an empty tuple: if result: return result else: return [()] @classmethod def is_expandable(cls, selector): """ Check whether a selector can be expanded into multiple identifiers. Parameters ---------- selector : Selector, str, unicode, or sequence Selector class instance, string (e.g., '/foo[0:2]'), or sequence of token sequences (e.g., [['foo', (0, 2)]]). Returns ------- result : bool True if the selector contains any intervals or sets of strings/integers, False otherwise. Ambiguous selectors are not deemed to be expandable, nor are fully expanded selectors or Selector instances. """ assert cls.is_selector(selector) if isinstance(selector, Selector) or cls.is_ambiguous(selector): return False if type(selector) in [str, unicode]: p = cls.parse(selector) elif type(selector) in [list, tuple]: p = selector else: raise ValueError('invalid selector type') for i in xrange(len(p)): for j in xrange(len(p[i])): if type(p[i][j]) in [int, long, str, unicode]: p[i][j] = [p[i][j]] elif type(p[i][j]) == slice: p[i][j] = range(p[i][j].start, p[i][j].stop) # The presence of a range containing more than 1 element # implies expandability: if len(p[i][j]) > 1: return True elif type(p[i][j]) == list: # The presence of a list containing more than 1 unique # element implies expandability: if len(set(p[i][j])) > 1: return True else: raise ValueError('invalid selector contents') if len(set([tuple(x) for y in p for x in itertools.product(*y)])) > 1: return True else: return False @staticmethod def are_consecutive(int_list): """ Check whether a list of integers is consecutive. Parameters ---------- int_list : list of int List of integers Returns ------- result : bool True if the integers are consecutive, false otherwise. Notes ----- Does not assume that the list is sorted. """ if set(np.diff(int_list)) == set([1]): return True else: return False @classmethod def tokens_to_str(cls, tokens): """ Convert expanded/parsed token sequence into a single selector string. Parameters ---------- s : sequence Sequence of expanded selector tokens. Returns ------- result : str String corresponding to selector tokens. """ assert np.iterable(tokens) result = [] for t in tokens: if type(t) in [str, unicode, int, long]: result.append('/'+str(t)) elif type(t) == slice: start = str(t.start) if t.start is not None else '' stop = str(t.stop) if t.stop is not None else '' result.append('[%s:%s]' % (start, stop)) elif type(t) in [tuple, list]: if not t: raise ValueError('invalid token') result.append('['+','.join(map(str, t))+']') else: raise ValueError('invalid token') return ''.join(result) @classmethod def collapse(cls, selector): """ Collapse a selector into a single string. Parameters ---------- selector : iterable Expanded selector. If the selector is a string, it is returned unchanged. Returns ------- s : str String that comprises all identifiers in the specified expanded selector. """ if isinstance(selector, basestring): return selector if isinstance(selector, Selector): return selector.str assert np.iterable(selector) result_list = [] for tokens in selector: result_list.append(cls.tokens_to_str(tokens)) return ','.join(result_list) @classmethod def _collapse(cls, id_list): """ Collapse a list of identifiers into a selector string. Parameters ---------- id_list : list of tuple List of identifiers; each identifier is a list of token tuples. Returns ------- selector : str String that expands into the given identifier list. Notes ----- Expects all identifiers in the given list to have the same number of levels. """ # XXX doesn't collapse expanded selectors such as /foo/xxx,/bar/yyy # properly raise NotImplemented('unfinished method - should eventually replace collapse()') # Can only collapse list identifiers that all have the same number of # levels: assert len(set(map(len, id_list))) == 1 # Collect all tokens for each level: levels = [[] for i in xrange(max(map(len, id_list)))] for i in xrange(len(id_list)): for j in xrange(len(id_list[i])): if not(id_list[i][j] in levels[j]): levels[j].append(id_list[i][j]) def collapse_level(level): """ Recursively called function to collapse all values in a single level. """ type_set = set(map(type, level)) if type_set in set([int, long]): # If a level only contains consecutive integers, convert it into an # interval: level.sort() if cls.are_consecutive(level): return ['[%s:%s]' % (min(level), max(level)+1)] # If a level contains nonconsecutive integers, convert it into a # list: else: return ['['+','.join([str(i) for i in level])+']'] elif type_set in set([str, unicode]): if len(level) == 1: return level else: return ['['+','.join([s for s in level])+']'] else: level_int = sorted([x for x in level if type(x) in [int, long]]) level_str = sorted([x for x in level if type(x) in [str, unicode]]) return collapse_level(level_int)+collapse_level(level_str) # If a level contains multiple string AND integer tokens, convert it to # a list: collapsed_list = [] for level in levels: collapsed_list.append(collapse_level(sorted(level))) selector_list = [] for t in itertools.product(*collapsed_list): selector = '' for s in t: if s[0] == '[': selector += s else: selector = selector + '/' + s selector_list.append(selector) return ','.join(selector_list) @classmethod def are_disjoint(cls, *selectors): """ Check whether several selectors are disjoint. Parameters ---------- s0, s1, ... : Selector, str, unicode, or sequence Selectors to check. Each selector is either a Selector class instance, a string (e.g., '/foo[0:2]'), or a sequence of token sequences (e.g., [['foo', (0, 2)]]). Returns ------- result : bool True if none of the identifiers comprised by one selector are comprised by the other. Notes ----- The selectors must not be ambiguous. The empty selector is deemed to be disjoint to all other selectors. """ assert len(selectors) >= 1 assert all(map(cls.is_selector, selectors)) if len(selectors) == 1: return True assert all(map(lambda s: not cls.is_ambiguous(s), selectors)) # Expand selectors into sets of identifiers: ids = set() for selector in selectors: # Skip empty selectors; they are seemed to be disjoint to all # selectors: ids_new = set(map(tuple, cls.expand(selector))) if ids_new == set([()]): continue # If some identifiers are present in both the previous expanded # selectors and the current selector, the selectors cannot be disjoint: if ids.intersection(ids_new): return False else: ids = ids.union(ids_new) return True @classmethod def count_ports(cls, selector): """ Count number of distinct port identifiers in unambigious selector. Parameters ---------- selector : Selector, str, unicode, or sequence Selector class instance, string (e.g., '/foo[0:2]'), or sequence of token sequences (e.g., [['foo', (0, 2)]]). Returns ------- count : int Number of identifiers comprised by selector. """ e = cls.expand(selector) if e == [()] or e == ((),): return 0 else: return len(e) # Need to create cache here because one can't assign create a cache that is # an attribute of the classmethod itself: __max_levels_cache = {} @classmethod def max_levels(cls, selector): """ Return maximum number of token levels in selector. Parameters ---------- selector : Selector, str, unicode, or sequence Selector class instance, string (e.g., '/foo[0:2]'), or sequence of token sequences (e.g., [['foo', slice(0, 2)]]). Returns ------- count : int Maximum number of tokens in selector. """ assert cls.is_selector(selector) # Selector class instances already contain max_levels precomputed: if isinstance(selector, Selector): return selector.max_levels # Handle unhashable selectors: try: hash(selector) except: h = _packb(selector) else: h = selector # Use memoization: try: return cls.__max_levels_cache[h] except: if isinstance(selector, Selector): return selector.max_levels elif type(selector) in [str, unicode]: try: count = max(map(len, cls.parse(selector))) except: count = 0 elif type(selector) in [list, tuple]: try: count = max(map(len, selector)) except: count = 0 else: raise ValueError('invalid selector type') cls.__max_levels_cache[h] = count return count @classmethod def _multiindex_row_in(cls, row, parse_list, start=None, stop=None): """ Check whether a row in a MultiIndex matches a parsed selector. Check whether the entries in a (subinterval of a) given tuple of data corresponding to the entries of one row in a MultiIndex match the specified token values. Parameters ---------- row : sequence Data corresponding to a single row of a MultiIndex. parse_list : list List of lists of token values extracted by ply. start, stop : int Start and end indices in `row` over which to test entries. If the Returns ------- result : bool True of all entries in specified subinterval of row match, False otherwise. """ row_sub = row[start:stop] for tokens in parse_list: # A single row will never match an empty token list: if not tokens: continue # Check whether all of the entries in `row_sub` match some list of # tokens. If this loop terminates prematurely because of a mismatch # between `row_sub` and some list of tokens in `parse_list`, it will # not return True; this forces checking of the subsequent token # lists: for i, token in enumerate(tokens): # '*' matches everything: if token == '*': continue # Integers and strings must match exactly: elif type(token) in [int, long, str, unicode]: if row_sub[i] != token: break # Tokens must be in a set of values: elif type(token) == list: if row_sub[i] not in token: break # Token must be within range of an interval: elif type(token) == slice: i_start = token.start i_stop = token.stop # Handle intervals with ambiguous start or stop values: if (i_start is not None and row_sub[i] < i_start) or \ (i_stop is not None and row_sub[i] >= i_stop): break else: continue else: return True # If the function still hasn't returned, no match was found: return False @classmethod def _index_row_in(cls, row, parse_list): """ Check whether a row in an Index matches a parsed selector. Check whether a row label in an Index instance matches the specified token values. Parameters ---------- row : scalar Data corresponding to a single row of an Index. parse_list : list List of lists of token values extracted by ply. Returns ------- result : bool True of all entries in specified subinterval of row match, False otherwise. """ # Since `row` is a scalar, it need only match the sole entry of one of # the lists in `parse_list`: for tokens in parse_list: if not tokens: continue if len(tokens) > 1: raise ValueError('index row only is scalar') if tokens[0] == '*': return True elif type(tokens[0]) in [int, long, str, unicode]: if row == tokens[0]: return True elif type(tokens[0]) == list: if row in tokens[0]: return True elif type(tokens[0]) == slice: i_start = tokens[0].start i_stop = tokens[0].stop if (i_start is None or row >= i_start) and \ (i_stop is None or row < i_stop): return True else: continue return False @classmethod def is_in(cls, s, t): """ Check whether all of the identifiers in one selector are comprised by another. Parameters ---------- s, t : Selector, str, unicode, or sequence Check whether selector `s` is in `t`. Each selector is either a Selector class instance, a string (e.g., '/foo[0:2]'), or a sequence of token sequences (e.g., [['foo', (0, 2)]]). Returns ------- result : bool True if the first selector is in the second, False otherwise. If `s` is an empty selector, this method always returns True. """ assert cls.is_selector(s) assert cls.is_selector(t) s_exp = set(cls.expand(s)) if s_exp == set([()]): return True t_exp = set(cls.expand(t)) if s_exp.issubset(t_exp): return True else: return False @classmethod def get_tuples(cls, df, selector, start=None, stop=None): """ Return tuples containing index labels selected by specified selector. Parameters ---------- df : pandas.DataFrame DataFrame instance on which to apply the selector. selector : Selector, str, unicode, or sequence Selector class instance, string (e.g., '/foo[0:2]'), or sequence of token sequences (e.g., [['foo', (0, 2)]]). start, stop : int Start and end indices in `row` over which to test entries. If the index of `df` is an Index, these are ignored. Returns ------- result : list List of tuples containing index labels for selected rows. If `df.index` is an Index, the result is a list of labels. """ assert cls.is_selector(selector) max_levels = cls.max_levels(selector) if isinstance(selector, Selector): parse_list = selector.expanded elif type(selector) in [str, unicode]: try: parse_list = cls.expand(selector, max_levels) except: parse_list = cls.parse(selector) elif type(selector) in [list, tuple]: parse_list = selector else: raise ValueError('invalid selector type') # The maximum number of tokens must not exceed the number of levels in the # DataFrame's MultiIndex: if max_levels > len(df.index.names[start:stop]): raise ValueError('Maximum number of levels in selector exceeds that of ' 'DataFrame index') if isinstance(df.index, pd.MultiIndex): return [t for t in df.index \ if cls._multiindex_row_in(t, parse_list, start, stop)] else: return [(t,) for t in df.index \ if cls._index_row_in(t, parse_list)] @classmethod def get_index(cls, df, selector, start=None, stop=None, names=[]): """ Return index corresponding to rows selected by specified selector. Parameters ---------- df : pandas.DataFrame DataFrame instance on which to apply the selector. selector : str or unicode Row selector. start, stop : int Start and end indices in `row` over which to test entries. names : scalar or list Name or names of levels to use in generated index. Returns ------- result : pandas.Index or pandas.MultiIndex Index that refers to the rows selected by the specified selector. """ assert cls.is_selector(selector) tuples = cls.get_tuples(df, selector, start, stop) if not tuples: raise ValueError('no tuples matching selector found') # XXX This probably could be made faster by directly manipulating the # existing MultiIndex: if all(map(np.iterable, tuples)): if np.iterable(names) and names: return pd.MultiIndex.from_tuples(tuples, names=names) elif names: return pd.MultiIndex.from_tuples(tuples, names=[names]) else: return pd.MultiIndex.from_tuples(tuples) else: if np.iterable(names) and names: return pd.Index(tuples, name=names[0]) elif names: return pd.Index(tuples, name=names) else: return pd.Index(tuples) @classmethod def index_to_selector(cls, idx): """ Convert an index into an expanded port selector. Parameters ---------- idx : pandas.Index or pandas.MultiIndex Index containing port identifiers. Returns ------- selector : list of tuple List of tuples corresponding to individual port identifiers. """ if isinstance(idx, pd.MultiIndex): return idx.tolist() else: return [(i,) for i in idx.tolist()] @classmethod def pad_tuple_list(cls, tuple_list, pad_len): """ Pad a list of tuples with blank strings. Parameters ---------- tuple_list : list of tuples List of tuples to process. pad_len : int Length to which tuples should be padded with blanks. Returns ------- result : list of tuples List of padded tuples. """ return [tuple(x)+('',)*(pad_len-len(x)) for x in tuple_list] @classmethod def pad_selector(cls, selector, pad_len=float('inf')): """ Expand and pad a selector with blank tokens. Expand a selector and pad those port identifier token sequences that contain fewer tokens than the specified maximum. Parameters ---------- selector : Selector, str, unicode, or sequence Selector class instance, string (e.g., '/foo[0:2]'), or sequence of token sequences (e.g., [['foo', slice(0, 2)]]). pad_len : int Length to which expanded token sequences should be padded with blanks. If infinite, the sequences are padded to the length of the longest sequence. Returns ------- padded : sequence Sequence of token sequences padded with blank strings. """ if isinstance(selector, Selector): expanded = selector.expanded max_levels = selector.max_levels else: expanded = cls.expand(selector) max_levels = max(map(len, expanded)) if pad_len == float('inf'): return cls.pad_tuple_list(expanded, max_levels) elif pad_len == 0: return expanded else: return cls.pad_tuple_list(expanded, pad_len) @classmethod def make_index_two_concat(cls, sel_0, sel_1, names=[]): """ Create an index from two selectors concatenated elementwise. Parameters ---------- sel_0, sel_1 : str or sequence Selector strings (e.g., '/foo[0:2]') or sequence of token sequences (e.g., [['foo', (0, 2)]]). Both of the selectors must comprise the same number of port identifiers. names : list Names of levels to use in generated MultiIndex. If no names are specified, the levels are assigned increasing integers starting with 0 as their names. Returns ------- result : pandas.MultiIndex MultiIndex whose rows are each the concatenation of the corresponding rows in `sel_0` and `sel_1`. Each row contains twice the maximum number of tokens in the two selectors. Notes ----- The selectors may not contain ambiguous symbols such as '*' or '[:]'. """ assert cls.is_selector(sel_0) assert not cls.is_ambiguous(sel_0) assert cls.is_selector(sel_1) assert not cls.is_ambiguous(sel_1) sels_0 = cls.expand(sel_0) sels_1 = cls.expand(sel_1) assert len(sels_0) == len(sels_1) N_sel = len(sels_0) levels = [[]] max_levels_0 = max(map(len, sels_0)) if N_sel else 0 max_levels_1 = max(map(len, sels_1)) if N_sel else 0 max_levels = max(max_levels_0, max_levels_1) selectors = [] for i in xrange(N_sel): # Pad expanded selectors: sels_0[i] = list(sels_0[i]) sels_1[i] = list(sels_1[i]) n = len(sels_0[i]) if n < max_levels: sels_0[i].extend(['' for k in xrange(max_levels-n)]) m = len(sels_1[i]) if m < max_levels: sels_1[i].extend(['' for k in xrange(max_levels-m)]) # Concatenate: selectors.append(sels_0[i]+sels_1[i]) # Extract level values: for k in xrange(max_levels*2): if len(levels) < k+1: levels.append([]) levels[k].append(selectors[-1][k]) # Discard duplicate level values: levels = [sorted(set(level)) for level in levels] # Start with at least one label so that a valid Index will be returned # if the selector is empty: labels = [[]] # Construct label indices: for i in xrange(N_sel): for j in xrange(max_levels*2): if len(labels) < j+1: labels.append([]) labels[j].append(levels[j].index(selectors[i][j])) if not names: names = range(len(levels)) return pd.MultiIndex(levels=levels, labels=labels, names=names) @classmethod def make_index_two_prod(cls, sel_0, sel_1, names=[]): """ Create an index from the product of two selectors. Parameters ---------- sel_0, sel_1 : str or sequence Selector strings (e.g., '/foo[0:2]') or sequence of token sequences (e.g., [['foo', (0, 2)]]). names : list Names of levels to use in generated MultiIndex. If no names are specified, the levels are assigned increasing integers starting with 0 as their names. Returns ------- result : pandas.MultiIndex MultiIndex whose rows are the product of the corresponding rows in `sel_0` and `sel_1`. Each row contains twice the maximum number of tokens in the two selectors. Notes ----- The selectors may not contain ambiguous symbols such as '*' or '[:]'. """ assert cls.is_selector(sel_0) assert not cls.is_ambiguous(sel_0) assert cls.is_selector(sel_1) assert not cls.is_ambiguous(sel_1) sels_0 = cls.expand(sel_0) sels_1 = cls.expand(sel_1) N_sel_0 = len(sels_0) N_sel_1 = len(sels_1) levels = [[]] max_levels_0 = max(map(len, sels_0)) if N_sel_0 else 0 max_levels_1 = max(map(len, sels_1)) if N_sel_1 else 0 max_levels = max(max_levels_0, max_levels_1) selectors = [] for i, j in itertools.product(xrange(N_sel_0), xrange(N_sel_1)): # Pad expanded selectors: sels_0[i] = list(sels_0[i]) sels_1[j] = list(sels_1[j]) n = len(sels_0[i]) if n < max_levels: sels_0[i].extend(['' for k in xrange(max_levels-n)]) m = len(sels_1[j]) if m < max_levels: sels_1[j].extend(['' for k in xrange(max_levels-m)]) # Concatenate: selectors.append(sels_0[i]+sels_1[j]) # Extract level values: for k in xrange(max_levels*2): if len(levels) < k+1: levels.append([]) levels[k].append(selectors[-1][k]) # Discard duplicate level values: levels = [sorted(set(level)) for level in levels] # Start with at least one label so that a valid Index will be returned # if the selector is empty: labels = [[]] # Construct label indices: N_sel = N_sel_0*N_sel_1 for i in xrange(N_sel): for j in xrange(max_levels*2): if len(labels) < j+1: labels.append([]) labels[j].append(levels[j].index(selectors[i][j])) if not names: names = range(len(levels)) return pd.MultiIndex(levels=levels, labels=labels, names=names) @classmethod def make_index(cls, selector, names=[]): """ Create an index from the specified selector. Parameters ---------- selector : Selector, str, unicode, or sequence Selector class instance, string (e.g., '/foo[0:2]'), or sequence of token sequences (e.g., [['foo', (0, 2)]]). names : list Names of levels to use in generated MultiIndex. If no names are specified, the levels are assigned increasing integers starting with 0 as their names. Returns ------- result : pandas.Index or pandas.MultiIndex MultiIndex corresponding to the specified selector. If the selector only contains a single level, an Index is returned (this is due to a pecularity of pandas). Notes ----- The selector may not contain ambiguous symbols such as '*' or '[:]'. """ assert cls.is_selector(selector) assert not cls.is_ambiguous(selector) # Since nonempty Selectors are already expanded into lists of tuples, # MultiIndex.from_tuples() can be used directly: if isinstance(selector, Selector) and selector.nonempty: if not names: names = range(selector.max_levels) return pd.MultiIndex.from_tuples(selector.expanded, names=names) # XXX It might be preferable to make expand() # convert all output to a tuple rather than just doing so here: selectors = tuple(cls.expand(selector)) N_sel = len(selectors) sel_lens = map(len, selectors) max_levels = max(sel_lens) if N_sel else 0 # NaNs in index are not supported by MultiIndex. Create from tuples # only if all selectors have same levels. if len(set(sel_lens)) == 1: if not names: if max_levels: names = range(max_levels) else: names = [0] if selectors == ((),): return pd.MultiIndex(levels=[[] for _ in range(len(names))], labels=[[] for _ in range(len(names))], names=names) else: return pd.MultiIndex.from_tuples(selectors, names=names) # Accumulate unique values for each level of the MultiIndex: levels = [set() for i in xrange(max_levels)] for i in xrange(N_sel): for j in xrange(sel_lens[i]): levels[j].add(selectors[i][j]) for j in xrange(sel_lens[i], max_levels): levels[j].add('') # Sort levels: levels = [sorted(level) for level in levels] # Construct label indices: labels = [[] for i in xrange(max_levels)] for i in xrange(N_sel): for j in xrange(sel_lens[i]): labels[j].append(levels[j].index(selectors[i][j])) for j in xrange(sel_lens[i], max_levels): labels[j].append(levels[j].index('')) if not names: names = range(len(levels)) return pd.MultiIndex(levels=levels, labels=labels, names=names) @classmethod def select(cls, df, selector, start=None, stop=None): """ Select rows from DataFrame using a path-like selector. Parameters ---------- df : pandas.DataFrame DataFrame instance on which to apply the selector. selector : Selector, str, unicode, or sequence Selector class instance, string (e.g., '/foo[0:2]') or sequence of token sequences (e.g., [['foo', (0, 2)]]). start, stop : int Start and end indices in `row` over which to test entries. Returns ------- result : pandas.DataFrame DataFrame containing selected rows. """ assert cls.is_selector(selector) if isinstance(selector, Selector): if len(df.index.names[start:stop])>1: try: tks = list(selector.expanded) return df[tks] except: pass parse_list = list(selector.expanded) elif type(selector) in [str, unicode]: if len(df.index.names[start:stop])>1: try: tks = cls.expand(selector) return df[tks] except: pass parse_list = cls.parse(selector) elif type(selector) in [list, tuple]: try: tks = cls.expand(selector) return df[tks] except: pass parse_list = selector else: raise ValueError('invalid selector type') # The number of tokens must not exceed the number of levels in the # DataFrame's MultiIndex; the maximum number of levels in a selector # containing no identifiers is obviously 0: max_levels = max(map(len, parse_list)) if len(parse_list) else 0 if max_levels > len(df.index.names[start:stop]): raise ValueError('Number of levels in selector exceeds number in row subinterval') if type(df.index) == pd.MultiIndex: return df.select(lambda row: cls._multiindex_row_in(row, parse_list, start, stop)) else: return df.select(lambda row: cls._index_row_in(row, parse_list))
# Set the option optimize=1 in the production version; need to perform these # assignments after definition of the rest of the class because the class' # internal namespace can't be accessed within its body definition: optimize = 0 SelectorParser.lexer = lex.lex(module=SelectorParser, optimize=optimize) SelectorParser.parser = yacc.yacc(module=SelectorParser, debug=0, write_tables=0, optimize=optimize)