Source code for henchman.selection

# -*- coding: utf-8 -*-

'''The selection module.

Contents:
        RandomSelect (X, n_feats): Choose n_feats at random
        Dendrogram (X, pairing_func, max_threshes)
'''
import numpy as np
import pandas as pd
import math
import random

from tqdm import tqdm
from collections import defaultdict

from henchman.learning import create_model


[docs]class RandomSelect: """Randomly choose a feature set. """
[docs] def __init__(self, names=None, n_feats=0): '''A class for randomly choosing a feature set. Args: names (list[str]): A list of column names selected. Default is the empty list. n_feats (int): The number of features to randomly select. ''' self.names = [] self.n_feats = n_feats
def set_params(self, **params): '''Method to functionally assign parameters. Expects a dictionary ``**params`` as input. ''' for key in params: setattr(self, key, params[key]) return self def fit(self, X, x=None): '''Randomly choose which features to select. Args: X (pd.Dataframe): A dataframe from which to select a subset of columns. ''' X = pd.DataFrame(X) column_list = [i for i in range(0, len(X.columns))] random.shuffle(column_list) column_list = column_list[:self.n_feats] all_columns = X.columns.values self.names = [all_columns[item] for item in column_list] def transform(self, X): '''Returns a subset of a dataframe. Args: X (pd.DataFrame): A dataframe with the same column names as the one with which the selector was fit. Returns: X_trans (pd.DataFrame): The dataframe subset X[self.names]. ''' X = pd.DataFrame(X) return X[self.names]
[docs]class Dendrogram(): """ Pair features by an arbitrary function. Creates a dendrogram which is a set of graphs representing connectivity at a set of discrete thresholds. """
[docs] def __init__(self, X=None, pairing_func=None, max_threshes=None): '''An object to store graphs for a given pairing function. If given a dataframe X this first creates an adjacency matrix given a certain pairing function. It will then go through and build endges and graphs from those edge-vertex pairs. The graphs are all stored in order. Args: X (pd.DataFrame): The dataframe for which to build the Dendrogram. pairing_func (func): A function which takes in two columns and returns a number. max_threshes (int): The maximum number of graphs to build. ''' if X is not None: self.fit(X, pairing_func=pairing_func, max_threshes=max_threshes)
[docs] def fit(self, X, pairing_func=None, max_threshes=None): '''Build graphs for a given pairing function. First creates an adjacency matrix given a certain pairing function. It will then go through and build endges and graphs from those edge-vertex pairs. The graphs are all stored in order. Args: X (pd.DataFrame): The dataframe for which to build the Dendrogram. pairing_func (func): A function which takes in two columns and returns a number. max_threshes (int): The maximum number of graphs to build. ''' if pairing_func is None: pairing_func = _one_minus_corr # Create adjacency matrix and columns list self.adj, self.columns = adj_maker(X, pairing_func) # Make edges for every thresh self._build_edges(max_threshes) # Make graphs for every thresh self._build_graphs() assert len(self.edges) > 0, 'Failed to build edges' assert len(self.graphs) > 0, 'Failed to build graphs'
[docs] def set_params(self, **params): '''Method to functionally assign parameters. Expects a dictionary ``**params`` as input. ''' for key in params: setattr(self, key, params[key]) return self
def _find_all_graphs(self): self.graphs = [] for edges in tqdm(self.edges): out = find_connected_components(list(self.columns.keys()), edges) self.graphs.append(out) def _build_graphs(self): if self.graphs == []: self._find_all_graphs() for i, graph in enumerate(self.graphs): if len(graph) == 1: break self.threshlist = self.threshlist[:i] self.edges = self.edges[:i] self.graphs = self.graphs[:i] def _build_edges(self, max_threshes): self.edges = [] self.graphs = [] uniques = list(np.unique(self.adj)) uniques = [x for x in uniques if not math.isnan(x)] if max_threshes is None and len(uniques) > 500: print('Calculating more than 500 graphs') print('You can pass max_threshes as a kwarg to Dendrogram') if max_threshes is not None: if len(uniques) > max_threshes: # Take max_threshes evenly spaced thresholds uniques = uniques[::int(np.floor((len(uniques)/max_threshes)) + 1)] self.threshlist = uniques for thresh in tqdm(self.threshlist): self.edges.append(_edge_maker(self.adj, thresh))
[docs] def features_at_step(self, step): '''Find the representatives at a certain step for a given graph. Args: step (int): Which position in self.threshlist to show features from. Returns: A list of features at ``step``. (list[str]) ''' featurelist = [self.columns[x] for x, _ in self.graphs[step].items()] return featurelist
[docs] def score_at_point(self, X, y, model, metric, step, n_splits=1): '''A helper method for scoring a Dendrogram at a step. Args: X (pd.DataFrame): A dataframe with the same columns that the Dendrogram was built with. y (pd.Series): Labels for X. step (int): Which position in self.threshlist to show features from. scoring_func (func): A function which takes in X and y. ''' featurelist = self.features_at_step(step) print('Using {} features'.format(len(featurelist))) return create_model(X[featurelist], y, model, metric, n_splits=n_splits)
[docs] def shuffle_score_at_point(self, X, y, model, metric, step, n_splits=1): '''A helper method for scoring a Dendrogram at a step. This method shuffles the graph representatives and then runs ``score_at_point``. By running shuffle and score at point multiple times, you can get an impression of how representative a particular feature set is of the underlying graph structure. Args: X (pd.DataFrame): A dataframe with the same columns that the Dendrogram was built with. y (pd.Series): Labels for X. model: A sklearn model with fit and predict methods. metric: A metric which takes y_test, preds and returns a score. step (int): Which position in self.threshlist to show features from. n_splits (int): If 1 use a train_test_split. Otherwise use tssplit. Default value is 1. ''' self.shuffle_all_representatives() return self.score_at_point(X, y, model, metric, step, n_splits=n_splits)
[docs] def find_set_of_size(self, size): '''Finds a column set of a certain size in the Dendrogram. This checks graphs until there are only ``size`` remaining components. Args: size (int): The number of features you want to end up with. Returns: The step at which there are ``size`` connected components. (int) ''' for i, graph in enumerate(self.graphs): if len(graph.keys()) <= size: print("There are {} distinct connected components " "at thresh step {} in the Dendrogram".format( len(graph.keys()), i)) if i > 0: prevlength = len(self.graphs[i - 1].keys()) print("You might also be interested in" " {} components at step {}".format(prevlength, i - 1)) return i print("Warning, could not find requested size, returning set of size {}".format( len(self.graphs[-1]))) return (len(self.graphs) - 1)
def shuffle_all_representatives(self): '''Shuffle representatives for every graph in ``D.graphs``. For every graph, look through the list associated to each key and choose one to be the new key. Note that keys are not fixed step by step, so the key for a cluster at step n is not guarenteed to be the key for the same cluster at step n+1. ''' assert self.graphs != [], 'Run D._build_graphs to get a graph' templist = [] for graph in self.graphs: temp = defaultdict(set) for key in graph.keys(): newkey = random.choice(list(graph[key])) temp[newkey] = graph[key] templist.append(temp) self.graphs = templist
[docs] def transform(self, X, n_feats=10): '''Return a dataframe of a particular size. Args: X (pd.Dataframe): The dataframe to transform. n_feats (int): The number of columns to return ''' assert X.shape[1] >= n_feats step = self.find_set_of_size(n_feats) return X[self.features_at_step(step)]
def adj_maker(data, pairing_func): '''Given a dataframe and a pairing function make an adjacency graph and a dictionary of columns. The dictionary can be used to associate column position to column name. Args: data (pd.DataFrame): A dataframe from which to make an adjacency graph. pairing_function (func): A function which takes in two columns and returns a number. Returns: adj, columns (np.array, dict[int, str]): An adjacency graph and a dictionary pairing column locations with column names. ''' adj = np.zeros((data.shape[1], data.shape[1])) for i, col1 in enumerate(data): for j, col2 in enumerate(data): adj[j][i] = pairing_func(data[col1], data[col2]) columns = {i: col for i, col in enumerate(data)} return adj, columns def _edge_maker(adj, thresh): '''Make all edges at a given threshold. Prerequisite to make the associated graph. ''' it = np.nditer(adj, flags=['multi_index']) edges = [] for val in it: if val <= thresh: edges.append(it.multi_index) return edges def find_connected_components(vertices, edges): '''Make a graph from a list of vertices and edges. Do a depth first search to make a dictionary whose keys are representatives and values is a list of vertices in a given component. It is assumed all edges are symmetric. Args: vertices (list[int]): A list of vertex locations. edges (list[tuple[int, int]]): A list of pairs of vertices. ''' d = defaultdict(set) for edge in edges: d[edge[0]].add(edge[1]) d[edge[1]].add(edge[0]) out = defaultdict(set) visited_list = [] temp_list = [] for vertex in vertices: if vertex not in visited_list: temp_list.extend([x for x in d[vertex]]) visited_list.append(vertex) out[vertex].add(vertex) while temp_list != []: newv = temp_list.pop() if newv in visited_list: pass else: temp_list.extend([x for x in d[newv]]) out[vertex].add(newv) visited_list.append(newv) return out def _one_minus_corr(a, b): "returns the absolute value of the correlation between a and b" return 1 - np.abs(np.corrcoef(a, b, rowvar=False)[0][1])