Source code for disamby.core

# -*- coding: utf-8 -*-

"""Main module."""
from collections import Counter
from typing import Union
from pandas import DataFrame, Series
from math import log
from tqdm import tqdm
from networkx import DiGraph
from collections import namedtuple

ScoredElement = namedtuple('ScoredElement', ['index', 'score'])
PandasObj = Union[DataFrame, Series]


[docs]class Disamby(object): """ Class for disambiguation fitting, scoring and ranking of potential matches A `Disamby` instance stores the pre-processing pipeline applied to the strings for a given field as well as the the computed frequencies from the entire corpus of strings to match against. `Disamby` can be instantiated either with not arguments, with a list of strings, pandas.Series or pandas.DataFrame. This triggers the immediate call to the `fit` method, whose doc explains the parameters. Examples -------- >>> import pandas as pd >>> import disamby.preprocessors as pre >>> df = pd.DataFrame( ... {'a': ['Luca Georger', 'Luca Geroger', 'Adrian Sulzer'], ... 'b': ['Mira, 34, Augsburg', 'Miri, 34, Augsburg', 'Milano, 34'] ... }, index=['L1', 'L2', 'O1'] ... ) >>> pipeline = [ ... pre.normalize_whitespace, ... pre.remove_punctuation, ... pre.trigram ... ] >>> dis = Disamby(df, pipeline) >>> dis.disambiguated_sets(threshold=0.5, verbose=False) [{'L2', 'L1'}, {'O1'}] """ def __init__(self, data: PandasObj=None, preprocessors: list=None, field: str=None): self.field_freq = dict() self.preprocessors = dict() self.records = dict() self._processed_token_cache = dict() self._most_common = dict() self._token_to_instance = dict() if data is not None: if preprocessors is None: raise ValueError("Preprocessor not provided") self.fit(data, preprocessors, field)
[docs] def fit(self, data: PandasObj, preprocessors: list, field: str=None): """ Computes the frequencies of the terms by field. Parameters ---------- data : pandas.DataFrame, pandas.Series or list of strings list of strings or pandas.DataFrame if dataframe is given then the field defaults to the column name preprocessors : list list of functions to apply in that order note the first function must accept a string, the other functions must be such that a pipeline is possible the result is a tuple of strings. field : str string identifying which field this data belongs to Examples -------- >>> import pandas as pd >>> from disamby.preprocessors import split_words >>> df = pd.DataFrame( ... {'a': ['Luca Georger', 'Luke Geroge', 'Adrian Sulzer'], ... 'b': ['Mira, 34, Augsburg', 'Miri, 32', 'Milano, 34'] ... }) >>> dis = Disamby() >>> prep = [split_words] >>> dis.fit(df, prep) """ try: columns = data.columns for col in columns: self._fit_field(data[col], preprocessors=preprocessors) except AttributeError: self._fit_field(data, preprocessors=preprocessors, field=field)
[docs] def find(self, idx, threshold=0.0, weights: dict=None, **kwargs) -> list: """ returns the list of scored instances which have a score above the threshold. Note that strings which do not share any token are omitted since their score is 0 by default. Parameters ---------- idx index of the record to find threshold weights : dict Returns ------- """ fields = self.fields scored_candidates = dict() if weights is None: weights = {f: 1 / len(fields) for f in fields} for field in fields: own_term = self.records[field][idx] own_tokens = self._processed_token_cache[field][own_term] potential_candidates = set() for token in own_tokens: potential_candidates |= self._token_to_instance[field][token] for candidate in potential_candidates: score = self.score(own_term, candidate[1], field, **kwargs) candidate_idx = candidate[0] if candidate_idx not in scored_candidates: scored_candidates[candidate_idx] = {field: score * weights[field]} else: scored_candidates[candidate_idx][field] = score * weights[field] # compute weighted score final_candidates = [] for idx, scores in scored_candidates.items(): total_score = sum(scores.values()) if total_score >= threshold: final_candidates.append(ScoredElement(idx, total_score)) return final_candidates
def _fit_field(self, data: PandasObj, preprocessors: list=None, field: str=None): if field not in self.preprocessors: ValueError('preprocessors have already been defined, ' 'cannot redefine. This would render the lookup ' 'inconsistent') if field is None: try: field = data.name except AttributeError: # was not a pandas.Series raise KeyError("The provided data are not a pandas Series, " "if the data is a list you need to provide the" "`field` argument.") self.preprocessors[field] = preprocessors self._processed_token_cache[field] = dict() self._token_to_instance[field] = dict() self.records[field] = dict() counter = Counter() for i, name in data.items(): norm_tokens = self.pre_process(name, preprocessors) self._processed_token_cache[field][name] = norm_tokens counter.update(norm_tokens) self.records[field][i] = name for token in norm_tokens: if token in self._token_to_instance[field]: self._token_to_instance[field][token] |= {(i, name)} else: self._token_to_instance[field][token] = {(i, name)} self._most_common[field] = counter.most_common(1)[0][1] self.field_freq[field] = counter
[docs] def score(self, term: str, other_term: str, field: str, smoother=None, offset=0) -> float: """ Computes the score between the two strings using the frequency data Parameters ---------- term : str term to search for other_term : str the other term to compare too field : str the name of the column to which this term belongs smoother : str (optional) one of {None, 'offset', 'log'} offset : int offset to add to count only needed for smoothers 'log' and 'offset' Returns ------- float Notes ----- The score is not commutative (i.e. score(A,B) != score(B,A)) """ try: own_parts = self._processed_token_cache[field][term] other_parts = self._processed_token_cache[field][other_term] except KeyError: own_parts = self.pre_process(term, self.preprocessors[field]) other_parts = self.pre_process(other_term, self.preprocessors[field]) # get list of potential scores weights = self.id_potential(own_parts, field, smoother, offset) score = sum(weights.get(tok, 0) for tok in other_parts) return score
[docs] def id_potential(self, term: Union[tuple, str], field: str, smoother: str = None, offset=0) -> dict: """ Computes the weights of the words based on the observed frequency and normalized. Parameters ---------- term : str, tuple term to look for or a tuple of proper tokens field : str field the word falls under smoother : str (optional) one of {None, 'offset', 'log'} offset : int offset to add to count only needed for smoothers 'log' and 'offset' Returns ------- float """ if isinstance(term, str): words = self.pre_process(term, self.preprocessors[field]) else: words = term smoothers = { None: self._smooth_none, 'offset': self._smooth_offset, 'log': self._smooth_log } if smoother not in smoothers: raise KeyError( 'Chosen smother `{}` is not one of {}'.format( smoother, smoothers.keys()) ) counter = self.field_freq[field] s_fun = smoothers[smoother] max_occ = self._most_common[field] id_potentials = { word: s_fun(counter[word], offset, max_occ) for word in words } total_weight = sum(id_potentials.values()) return {w: idp / total_weight for w, idp in id_potentials.items()}
[docs] def alias_graph(self, threshold=0.7, verbose=True, weights=None, **kwargs) -> DiGraph: """ This function creates the directed network connecting an instance to an other through a directed edge if the the target instance has a similarity score above the threshold. Parameters ---------- weights threshold : float between 0 and 1 verbose : whether to show the progressbar kwargs : arguments to pass to the score function (i.e. offset, smoother) Returns ------- DiGraph """ if not verbose: def t(x): return x else: t = tqdm edges = [] fields = self.records.keys() a_field = list(fields)[0] for idx in t(self.records[a_field]): targets = self.find(idx, threshold=threshold, weights=weights, **kwargs) new_edges = [(idx, x.index, {'score': x.score}) for x in targets] edges.extend(new_edges) alias_graph = DiGraph() alias_graph.add_edges_from(edges) return alias_graph
[docs] def disambiguated_sets(self, threshold=0.7, verbose=True, weights=None, **kwargs): from networkx import strongly_connected_components alias_graph = self.alias_graph(threshold, verbose, weights, **kwargs) strong = strongly_connected_components(alias_graph) return list(strong)
[docs] @staticmethod def pre_process(base_name, functions: list): """apply every function consecutively to base_name""" norm_name = base_name for f in functions: norm_name = f(norm_name) return set(norm_name)
@property def fields(self): return self.field_freq.keys() @staticmethod def _smooth_none(occurrences, *args): return 1 / max(occurrences, 1) @staticmethod def _smooth_offset(occurrences, offset, *args): return 1 / max(occurrences + offset, 1) @staticmethod def _smooth_log(occ, offset, max_occ): max_offset = max(max_occ + offset + 1, 1) word_offset = max(occ + offset, 1) return log(max_offset / word_offset)