Source code for src.dackar.text_processing.Preprocessing

# Copyright 2024, Battelle Energy Alliance, LLC  ALL RIGHTS RESERVED
"""
Created on October, 2022

@author: dgarrett622, wangc, mandd
"""
from cytoolz import functoolz
import re
import textacy.preprocessing as preprocessing
from numerizer import numerize
import spacy
from spacy.vocab import Vocab

try:
  from contextualSpellCheck.contextualSpellCheck import ContextualSpellCheck
except ModuleNotFoundError as error:
  print("ERROR: Unable to import contextualSpellCheck", error)
  print("Please try to install it via: 'pip install contextualSpellCheck'")
try:
  import autocorrect
except ModuleNotFoundError as error:
  print("ERROR: Unable to import autocorrect", error)
  print("Please try to install it via: 'pip install autocorrect'")
try:
  from spellchecker import SpellChecker as PySpellChecker
except ModuleNotFoundError as error:
  print("ERROR: Unable to import spellchecker", error)
  print("Please try to install it via: 'pip install spellchecker'")

import itertools
import os
import numpy as np
import pandas as pd

from ..similarity.simUtils import wordsSimilarity

# list of available preprocessors in textacy.preprocessing.normalize
[docs] textacyNormalize = ['bullet_points', 'hyphenated_words', 'quotation_marks', 'repeating_chars', 'unicode', 'whitespace']
# list of available preprocessors in textacy.preprocessing.remove
[docs] textacyRemove = ['accents', 'brackets', 'html_tags', 'punctuation']
# list of available preprocessors in textacy.preprocessing.replace
[docs] textacyReplace = ['currency_symbols', 'emails', 'emojis', 'hashtags', 'numbers', 'phone_numbers', 'urls', 'user_handles']
# list of available preprocessors from numerizer
[docs] numerizer = ['numerize']
[docs] preprocessorDefaultList = ['bullet_points', 'hyphenated_words', 'quotation_marks', 'repeating_chars', 'whitespace', 'unicode', 'accents', 'html_tags', 'punctuation', 'emails', 'emojis', 'hashtags', 'urls', 'numerize', 'whitespace']
[docs] preprocessorDefaultOptions = {'repeating_chars': {'chars': ',', 'maxn': 1}, 'unicode': {'form': 'NFKC'}, 'accents': {'fast': False}, 'punctuation': {'only':["*","+",":","=","\\","^","_","|","~", "..", "..."]}}
# TODO: replace & --> and, @ --> at, maybe "/" --> or
[docs] class Preprocessing(object): """ NLP Preprocessing class """ def __init__(self, preprocessorList=preprocessorDefaultList, preprocessorOptions=preprocessorDefaultOptions): """ Preprocessing object constructor Arg: preprocessorList: list, list of preprocessor names as strings preprocessorOptions: dict, dictionary of dictionaries containing optional arguments for preprocessors top level key is name of preprocessor Return: None """
[docs] self.functionList = [] # list of preprocessor functions
[docs] self.preprocessorNames = textacyNormalize + textacyRemove + textacyReplace + numerizer
# collect preprocessor functions in a list for name in preprocessorList: # strip out options for preprocessor if name in preprocessorOptions: options = preprocessorOptions[name] else: options = {} # build the function to do the preprocessing if name in textacyNormalize: self.createTextacyNormalizeFunction(name, options) elif name in textacyRemove: self.createTextacyRemoveFunction(name, options) elif name in textacyReplace: self.createTextacyReplaceFunction(name, options) elif name in numerizer: # create function to store in functionList self.functionList.append(lambda x: numerize(x, ignore=['a ', 'A', 'second'])) else: print(f'{name} is ignored! \nAvailable preprocessors: {self.preprocessorNames}') # create the preprocessor pipeline (composition of functionList)
[docs] self.pipeline = functoolz.compose_left(*self.functionList)
[docs] def createTextacyNormalizeFunction(self, name, options): """ Creates a function from textacy.preprocessing.normalize such that only argument is a string and adds it to the functionList Args: name: str, name of the preprocessor options: dict, dictionary of preprocessor options Returns: None """ # check for optional arguments useChars, useMaxn, useForm = False, False, False # options for repeating_chars if 'chars' in options and isinstance(options['chars'], str): # if chars is not str, it gets ignored useChars = True if 'maxn' in options and isinstance(options['maxn'], int): # if maxn is not int, it gets ignored useMaxn = True # option for unicode if 'form' in options and isinstance(options['form'], str): # if form is not str, it gets ignored useForm = True # build function for the pipeline if useChars or useMaxn or useForm: # include optional arguments f = lambda x: getattr(preprocessing.normalize, name)(x, **options) else: # no options need to be included f = lambda x: getattr(preprocessing.normalize, name)(x) # add to the functionList self.functionList.append(f)
[docs] def createTextacyRemoveFunction(self, name, options): """ Creates a function from textacy.preprocessing.remove such that the only argument is a string and adds it to the functionList Args: name: str, name of the preprocessor options: dict, dictionary of preprocessor options Returns: None """ # check for optional arguments useFast, useOnly = False, False # option for accents if 'fast' in options and isinstance(options['fast'], bool): # if fast is not bool, it gets ignored useFast = True # option for brackets and punctuation if 'only' in options and isinstance(options['only'], (str, list, tuple)): # if only is not str, list, or tuple, it gets ignored useOnly = True # build function for the pipeline if useFast or useOnly: # include optional arguments f = lambda x: getattr(preprocessing.remove, name)(x, **options) else: # no options need to be included f = lambda x: getattr(preprocessing.remove, name)(x) # add to the functionList self.functionList.append(f)
[docs] def createTextacyReplaceFunction(self, name, options): """ Creates a function from textacy.preprocessing.replace such that the only argument is a string and adds it to the functionList Args: name: str, name of the preprocessor options: dict, dictionary of preprocessor options Returns: None """ # check for optional arguments useRepl = False if 'repl' in options and isinstance(options['repl'], str): # if repl is not str, it gets ignored useRepl = True # build function for the pipeline if useRepl: # include optional argument f = lambda x: getattr(preprocessing.replace, name)(x, **options) else: # no options need to be included f = lambda x: getattr(preprocessing.replace, name)(x) # add to the functionList self.functionList.append(f)
[docs] def __call__(self, text): """ Performs the preprocessing Args: text: str, string of text to preprocess Returns: processed: str, string of processed text """ processed = text.strip('\n') processed = re.sub(r'&', ' and ', processed) # processed = re.sub(r'/', ' and ', processed) processed = re.sub(r'@', ' at ', processed) processed = self.pipeline(processed) return processed
[docs] class SpellChecker(object): """ Object to find misspelled words and automatically correct spelling Note: when using autocorrect, one need to conduct a spell test to identify the threshold (the word frequences) """ def __init__(self, checker='autocorrect'): """ SpellChecker object constructor Args: checker: str, optional, spelling corrector to use ('autocorrect' or 'ContextualSpellCheck') Returns: None """
[docs] self.checker = checker.lower()
# get included and additional dictionary words and update speller dictionary if self.checker == 'autocorrect': self.speller = autocorrect.Speller() self.includedWords = [] file2open = os.path.join(os.path.dirname(__file__) , os.pardir, os.pardir, os.pardir, 'data' , 'ac_additional_words.txt') with open(file2open, 'r') as file: tmp = file.readlines() self.addedWords = list({x.replace('\n', '') for x in tmp}) self.speller.nlp_data.update({x: 1000000 for x in self.addedWords}) elif self.checker == 'pyspellchecker': self.speller = PySpellChecker() self.includedWords = [] file2open = os.path.join(os.path.dirname(__file__) , os.pardir, os.pardir, os.pardir, 'data' , 'psc_additional_words.txt') with open(file2open, 'r') as file: tmp = file.readlines() self.addedWords = list({x.replace('\n', '') for x in tmp}) self.speller.word_frequency.load_words(self.addedWords) else: name = 'contextual spellcheck' self.nlp = spacy.load('en_core_web_lg') self.speller = ContextualSpellCheck(self.nlp, name) self.includedWords = list(self.speller.BertTokenizer.get_vocab().keys()) file2open = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, 'data' , 'csc_additional_words.txt') with open(file2open, 'r') as file: tmp = file.readlines() self.addedWords = [x.replace('\n', '') for x in tmp] self.speller.vocab = Vocab(strings=self.includedWords+self.addedWords)
[docs] def addWordsToDictionary(self, words): """ Adds a list of words to the spell check dictionary Args: words: list, list of words to add to the dictionary Returns: None """ if self.checker == 'autocorrect': self.speller.nlp_data.update({word: 1000000 for word in words}) elif self.checker == 'pyspellchecker': self.speller.word_frequency.load_words(self.addedWords+words) else: self.speller.vocab = Vocab(strings=self.includedWords+self.addedWords+words)
[docs] def getMisspelledWords(self, text): """ Returns a list of words that are misspelled according to the dictionary used Args: None Returns: misspelled: list, list of misspelled words """ if self.checker == 'autocorrect': # corrected = self.speller(text.lower()) original = re.findall(r'[^\s!,.?":;-]+', text) # auto = re.findall(r'[^\s!,.?":;-]+', corrected) # misspelled = list({w1 if w1.lower() != w2.lower() else None for w1, w2 in zip(original, auto)}) misspelled = [word for word in original if word not in self.speller.nlp_data] if None in misspelled: misspelled.remove(None) elif self.checker == 'pyspellchecker': original = re.findall(r'[^\s!,.?":;-]+', text) misspelled = self.speller.unknown(original) else: doc = self.nlp(text) doc = self.speller(doc) misspelled = list({str(x) for x in doc._.suggestions_spellCheck.keys()}) return misspelled
[docs] def correct(self, text): """ Performs automatic spelling correction and returns corrected text Args: None Returns: corrected: str, spelling corrected text """ if self.checker == 'autocorrect': corrected = self.speller(text) elif self.checker == 'pyspellchecker': l = re.split("([A-Za-z]+(?=\s|\.))", text) corrected = [] for elem in l: if len(elem) == 0: corrected.append(elem) elif not re.search("[^A-Za-z]+",elem): if elem in self.speller: corrected.append(elem) else: corrected.append(self.speller.correction(elem)) else: corrected.append(elem) corrected = "".join(corrected) else: doc = self.nlp(text) doc = self.speller(doc) corrected = doc._.outcome_spellCheck return corrected
[docs] def handleAbbreviations(self, abbrDatabase, text, type): """ Performs automatic correction of abbreviations and returns corrected text This method relies on a database of abbreviations located at: `src/nlp/data/abbreviations.xlsx` This database contains the most common abbreviations collected from literarture and it provides for each abbreviation its corresponding full word(s); an abbreviation might have multple words associated. In such case the full word that makes more sense given the context is chosen (see findOptimalOption method) Args: abbrDatabase: pandas dataframe, dataframe containing library of abbreviations and their correspoding full expression text: str, string of text that will be analyzed type: string, type of abbreviation method ('spellcheck','hard','mixed') that are employed to determine which words are abbreviations that need to be expanded * spellcheck: in this case spellchecker is used to identify words that are not recognized * hard: here we directly search for the abbreviations in the provided sentence * mixed: here we perform first a "hard" search followed by a "spellcheck" search Returns: options: list, list of corrected text options """ abbreviationSet = set(abbrDatabase['Abbreviation'].values) if type == 'spellcheck': unknowns = self.getMisspelledWords(text) elif type == 'hard' or type=='mixed': unknowns = [] splitSent = text.split() for word in splitSent: if word.lower() in abbreviationSet: unknowns.append(word) if type=='mixed': set1 = set(self.getMisspelledWords(text)) set2 = set(unknowns) unknowns = list(set1.union(set2)) corrections={} for word in unknowns: if word.lower() in abbrDatabase['Abbreviation'].values: locs = list(abbrDatabase['Abbreviation'][abbrDatabase['Abbreviation']==word.lower()].index.values) if locs: corrections[word] = abbrDatabase['Full'][locs].values.tolist() else: print(word) else: # Here we are addressing the fact that the abbreviation database will never be complete # Given an abbreviation that is not part of the abbreviation database, we are looking for a # a subset of abbreviations the abbreviation database that are close enough (and consider # them as possible candidates from difflib import SequenceMatcher corrections[word] = [] abbreviationDS = abbrDatabase['Abbreviation'].values for index,abbr in enumerate(abbreviationDS): if SequenceMatcher(None, word, abbr).ratio()>0.8: corrections[word].append(abbrDatabase['Full'].values.tolist()[index]) if not corrections[word]: corrections.pop(word) combinations = list(itertools.product(*list(corrections.values()))) options = [] for comb in combinations: corrected = text for index,key in enumerate(corrections.keys()): corrected = re.sub(r"\b%s\b" % str(key) , comb[index], corrected) options.append(corrected) if not options: return text else: bestOpt = self.findOptimalOption(options) return bestOpt
[docs] def generateAbbrDict(self, abbrDatabase): """ Generates an AbbrDict that can be used by handleAbbreviationsDict Args: abbrDatabase: pandas dataframe, dataframe containing library of abbreviations and their correspoding full expression Returns: abbrDict: dictionary, a abbreviations dictionary """ abbrDict = {} #There may be a more efficient way to do the following for row in abbrDatabase.itertuples(): abbrs = abbrDict.get(row.Abbreviation,[]) abbrs.append(row.Full) abbrDict[row.Abbreviation] = abbrs return abbrDict
[docs] def handleAbbreviationsDict(self, abbrDict, text, type): """ Performs automatic correction of abbreviations and returns corrected text This method relies on a database of abbreviations located at: src/nlp/data/abbreviations.xlsx This database contains the most common abbreviations collected from literarture and it provides for each abbreviation its corresponding full word(s); an abbreviation might have multple words associated. In such case the full word that makes more sense given the context is chosen (see findOptimalOption method) Args: abbrDict: dictionary, dictionary containing library of abbreviations and their correspoding full expression text: str, string of text that will be analyzed type: string, type of abbreviation method ('spellcheck','hard','mixed') that are employed to determine which words are abbreviations that need to be expanded * spellcheck: in this case spellchecker is used to identify words that are not recognized * hard: here we directly search for the abbreviations in the provided sentence * mixed: here we perform first a "hard" search followed by a "spellcheck" search Return: options: list, list of corrected text options """ if type == 'spellcheck': unknowns = self.getMisspelledWords(text) elif type == 'hard' or type=='mixed': unknowns = [] splitSent = text.split() for word in splitSent: if word.lower() in abbrDict.keys(): unknowns.append(word) if type=='mixed': set1 = set(self.getMisspelledWords(text)) set2 = set(unknowns) unknowns = list(set1.union(set2)) corrections={} for word in unknowns: if word.lower() in abbrDict.keys(): if len(abbrDict[word.lower()]) > 0: corrections[word] = abbrDict[word.lower()] else: # Here we are addressing the fact that the abbreviation database will never be complete # Given an abbreviation that is not part of the abbreviation database, we are looking for a # a subset of abbreviations the abbreviation database that are close enough (and consider # them as possible candidates from difflib import SequenceMatcher corrections[word] = [] abbreviationDS = list(abbrDict) for index,abbr in enumerate(abbreviationDS): val=0 newVal = SequenceMatcher(None, word, abbr).ratio() if newVal>=0.75 and newVal>val: corrections[word] = abbrDict[abbr] val = newVal if not corrections[word]: corrections.pop(word) combinations = list(itertools.product(*list(corrections.values()))) options = [] for comb in combinations: corrected = text for index,key in enumerate(corrections.keys()): corrected = re.sub(r"\b%s\b" % str(key) , comb[index], corrected) options.append(corrected) if not options: return text else: bestOpt = self.findOptimalOption(options) return bestOpt
[docs] def findOptimalOption(self,options): """ Method to handle abbreviation with multiple meanings Args: options: list, list of sentence options Return: optimalOpt: string, option from the provided options list that fits more the possible """ nOpt = len(options) combScore = np.zeros(nOpt) for index,opt in enumerate(options): listOpt = opt.split() for i,word in enumerate(listOpt): for j in range(i+1,len(listOpt)): combScore[index] = combScore[index] + wordsSimilarity(word,listOpt[j]) optIndex = np.argmax(combScore) optimalOpt = options[optIndex] return optimalOpt
[docs] class AbbrExpander(object): """ Class to expand abbreviations """ def __init__(self, abbreviationsFilename, checkerType='autocorrect', abbrType='mixed'): """ Abbreviation expander constructor Args: abbreviationsFilename: string, filename of abbreviations data Return: None """
[docs] self.abbrType = abbrType
[docs] self.checkerType = checkerType
[docs] self.abbrList = pd.read_excel(abbreviationsFilename)
[docs] self.preprocessorList = ['hyphenated_words', 'whitespace', 'numerize']
[docs] self.preprocess = Preprocessing(self.preprocessorList, {})
[docs] self.checker = SpellChecker(checker=self.checkerType)
[docs] self.abbrDict = self.checker.generateAbbrDict(self.abbrList)
[docs] def abbrProcess(self, text, splitToList=False): """ Expands the abbreviations in text Args: text: string, the text to expand Returns: expandedText: string, the text with abbreviations expanded """ text = self.preprocess(text) if not splitToList: expandedText = self.checker.handleAbbreviationsDict(self.abbrDict, text.lower(), type=self.abbrType) else: text = text.replace("\n", "") textList = [t.strip() for t in text.split('.')] expandedText = [] for t in textList: cleanedText = self.checker.handleAbbreviationsDict(self.abbrDict, t.lower(), type=self.abbrType) expandedText.append(cleanedText) expandedText = '. '.join(expandedText) return expandedText