Source code for src.dackar.pipelines.CustomPipelineComponents

# Copyright 2024, Battelle Energy Alliance, LLC  ALL RIGHTS RESERVED
"""
Created on March, 2022

@author: wangc, mandd
"""
import spacy
from spacy.language import Language
from spacy.tokens import Span
from spacy.matcher import Matcher
from spacy.tokens import Token
# filter_spans is used to resolve the overlap issue in entities
# It gives primacy to longer spans (entities)
from spacy.util import filter_spans
import pandas as pd

# use pysbd as a sentencizer component for spacy
import pysbd
from ..config import nlpConfig

import logging

# import sys

# sys.setrecursionlimit(10000)


[docs] logger = logging.getLogger(__name__)
#### Using spacy's Token extensions for coreferee if Token.has_extension('ref_n'): _ = Token.remove_extension('ref_n') if Token.has_extension('ref_t'): _ = Token.remove_extension('ref_t') if Token.has_extension('ref_t_'):
[docs] _ = Token.remove_extension('ref_t_')
Token.set_extension('ref_n', default='') Token.set_extension('ref_t', default='') if not Token.has_extension('alias'): Token.set_extension('alias', default=None) if not Span.has_extension('health_status'): Span.set_extension("health_status", default=None) if not Span.has_extension('alias'): Span.set_extension("alias", default=None) if not Token.has_extension('ref_ent'): Token.set_extension("ref_ent", default=None) if not Span.has_extension('alias'): Span.set_extension("alias", default=None)
[docs] customLabel = ['STRUCTURE', 'COMPONENT', 'SYSTEM']
[docs] aliasLookup = {}
if 'alias_file' in nlpConfig['files']:
[docs] df = pd.read_csv(nlpConfig['files']['alias_file'], index_col='alias')
aliasLookup.update(df.to_dict()['name'])
[docs] def getEntID(): """ """ if 'params' in nlpConfig: entLabel = nlpConfig['params'].get('ent_label', "SSC") entID = nlpConfig['params'].get('ent_id', "SSC") else: entLabel = "SSC" entID = "SSC" return entID, entLabel
# Use Config File to update aliasLookup Dictionary # orders of NLP pipeline: 'ner' --> 'normEntities' --> 'merge_entities' --> 'initCoref' # --> 'aliasResolver' --> 'coreferee' --> 'anaphorCoref' @Language.component("normEntities")
[docs] def normEntities(doc): """ Normalizing Named Entities, remove the leading article and trailing particle Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after the normalizing named entities """ ents = [] for ent in doc.ents: if ent[0].pos_ == "DET": # leading article ent = Span(doc, ent.start+1, ent.end, label=ent.label) if len(ent) > 0: if ent[-1].pos_ == "PART": # trailing particle like 's ent = Span(doc, ent.start, ent.end-1, label=ent.label) if len(ent) > 0: ents.append(ent) doc.ents = tuple(ents) return doc
@Language.component("initCoref")
[docs] def initCoref(doc): """ Initialize the coreference, assign text and label to custom extension ``ref_n`` and ``ref_t`` Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after the initializing coreference """ for e in doc.ents: # # if e.label_ in customLabel: e[0]._.ref_n, e[0]._.ref_t = e.text, e.label_ return doc
@Language.component("aliasResolver")
[docs] def aliasResolver(doc): """ Lookup aliases and store result in ``alias`` Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after the alias lookup """ for ent in doc.ents: alias = ent.text.lower() if alias in aliasLookup: name = aliasLookup[alias] ent._.set('alias', name) return doc
[docs] def propagateEntType(doc): """ propagate entity type stored in ``ref_t`` Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after entity type extension """ ents = [] for e in doc.ents: if e[0]._.ref_n != '': # if e is a coreference e = Span(doc, e.start, e.end, label=e[0]._.ref_t) ents.append(e) doc.ents = tuple(ents) return doc
@Language.component("anaphorCoref")
[docs] def anaphorCoref(doc): """ Anaphora resolution using coreferee This pipeline need to be added after NER. The assumption here is: The entities need to be recognized first, then call pipeline ``initCoref`` to assign initial custom attribute ``ref_n`` and ``ref_t``, then call pipeline ``aliasResolver`` to resolve all the aliases used in the text. After all these pre-processes, we can use ``anaphorCoref`` pipeline to resolve the coreference. Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after the anaphora resolution using coreferee """ if not Token.has_extension('coref_chains'): return doc for token in doc: coref = token._.coref_chains # if token is coref and not already dereferenced if coref and token._.ref_n == '': # check all the references, if "ref_n" is available (determined by NER and initCoref), # the value of "ref_n" will be assigned to current totken for chain in coref: for ref in chain: refToken = doc[ref[0]] if refToken._.ref_n != '': token._.ref_n = refToken._.ref_n token._.ref_t = refToken._.ref_t break return doc
@Language.component("anaphorEntCoref")
[docs] def anaphorEntCoref(doc): """ Anaphora resolution using coreferee for Entities This pipeline need to be added after NER. The assumption here is: The entities need to be recognized first, then call pipeline ``initCoref`` to assign initial custom attribute ``ref_n`` and ``ref_t``, then call pipeline ``aliasResolver`` to resolve all the aliases used in the text. After all these pre-processes, we can use ``anaphorEntCoref`` pipeline to resolve the coreference. Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after the anaphora resolution using coreferee """ if not Token.has_extension('coref_chains'): return doc for ent in doc.ents: for token in ent: coref = token._.coref_chains if not coref: continue for chain in coref: for ref in chain: for index in ref: refToken = doc[index] if refToken._.ref_ent is None: refToken._.ref_ent = ent return doc
@Language.component("expandEntities")
[docs] def expandEntities(doc): """ Expand the current entities, recursive function to extend entity with all previous NOUN Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after expansion of current entities """ newEnts = [] isUpdated = False entID, _ = getEntID() for ent in doc.ents: if ent.ent_id_ == entID and ent.start != 0: prevToken = doc[ent.start - 1] if prevToken.pos_ in ['NOUN']: newEnt = Span(doc, ent.start - 1, ent.end, label=ent.label) newEnts.append(newEnt) isUpdated = True else: newEnts.append(ent) # print(newEnts) doc.ents = filter_spans(list(doc.ents) + newEnts) if isUpdated: doc = expandEntities(doc) return doc
# @Language.component("mergeEntitiesWithSameID") # def mergeEntitiesWithSameID(doc): # """ # Merge the same ID entities # Args: # doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines # Returns: # doc: spacy.tokens.doc.Doc, the document after expansion of current entities # """ # newEnts = [] # isUpdated = True # entID, _ = getEntID() # while isUpdated: # ents = list(doc.ents) # if len(ents) <= 1: # isUpdated = False # for i in range(len(ents)-1): # if i == len(ents) - 2: # isUpdated = False # ent1, ent2 = ents[i], ents[i+1] # if ent1.ent_id_ != entID: # if i == len(ents)-2: # isUpdated = False # start = ent1.start # end = ent1.end # label = ent1.label # alias = ent1._.alias # # id = ent1.ent_id # if start == 1: # prev = doc[start - 1] # if prev.pos_ in ['NUM']: # start = prev.i # elif start > 1: # prev1, prev2 = doc[start-1], doc[start-2] # if prev1.pos_ in ['NUM']: # start = prev1.i # elif prev1.dep_ in ['punct'] and prev2.pos_ in ['NUM']: # start = prev2.i # cond = ent1.root.dep_ in ['pobj', 'dobj', 'iobj', 'obj', 'obl', 'oprd'] and ent2.root.dep_ in ['nsubj', 'nsubjpass', 'nsubj:pass'] # if ent2.ent_id_ == entID and ent2.start == end: # if not cond: # end = ent2.end # label = ent2.label # elif ent2.ent_id_ == entID and ent2.start - end == 1: # if not cond and doc[end].dep_ in ['punct']: # end = ent2.end # label = ent2.label # if end == ent2.end: # if ent2._.alias: # alias = ent2._.alias # elif end < ent1.sent.end - 1: # post1, post2 = doc[end], doc[end+1] # if post1.pos_ in ['NUM']: # end = end + 1 # elif post1.dep_ in ['punct'] and post2.pos_ in ['NUM']: # end = end + 2 # elif end == ent1.sent.end: # post = doc[end] # if post.pos_ in ['NUM']: # end = end + 1 # if start != ent1.start or end != ent1.end: # newEnt = Span(doc, start, end, label=label) # newEnt._.set('alias', alias) # newEnts.append(newEnt) # doc.ents = filter_spans(list(doc.ents) + newEnts) # if i == len(ents) - 2: # isUpdated = False # break # else: # if i == len(ents) -2: # isUpdated = False # return doc # Recursive function has default max depth 1000, when process record by record, the number of calls # is equal the number of record, and the max depth will be reached. @Language.component("mergeEntitiesWithSameID")
[docs] def mergeEntitiesWithSameID(doc): """ Merge the same ID entities Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after expansion of current entities """ newEnts = [] isUpdated = False ents = list(doc.ents) entID, _ = getEntID() for i in range(len(ents)-1): ent1, ent2 = ents[i], ents[i+1] start = ent1.start end = ent1.end label = ent1.label alias = ent1._.alias # id = ent1.ent_id if ent1.ent_id_ == entID and not isUpdated: if start == 1: prev = doc[start - 1] if prev.pos_ in ['NUM']: start = prev.i elif start > 1: prev1, prev2 = doc[start-1], doc[start-2] if prev1.pos_ in ['NUM']: start = prev1.i elif prev1.dep_ in ['punct'] and prev2.pos_ in ['NUM']: start = prev2.i if ent2.ent_id_ == entID: cond = ent1.root.dep_ in ['pobj', 'dobj', 'iobj', 'obj', 'obl', 'oprd'] and ent2.root.dep_ in ['nsubj', 'nsubjpass', 'nsubj:pass'] if not cond: if end == ent2.start or (end == ent2.start - 1 and doc[end].dep_ in ['punct']): end = ent2.end label = ent2.label if ent2._.alias: alias = ent2._.alias # id = ent2.ent_id if end == ent1.end: if end < ent1.sent.end - 1: post1, post2 = doc[end], doc[end+1] if post1.pos_ in ['NUM']: end = end + 1 elif post1.dep_ in ['punct'] and post2.pos_ in ['NUM']: end = end + 2 elif end == ent1.sent.end: post = doc[end] if post.pos_ in ['NUM']: end = end + 1 if start != ent1.start or end != ent1.end: isUpdated = True if len(newEnts) == 0 or (len(newEnts) > 0 and start >= newEnts[-1].end): newEnt = Span(doc, start, end, label=label) newEnt._.set('alias', alias) # print(newEnt, newEnt.ent_id_, newEnt.label_, newEnt._.alias) newEnts.append(newEnt) if len(newEnts) > 0 and start < newEnts[-1].end: preEnt = newEnts[-1] newEnt = Span(doc, preEnt.start, end, label=label) alias = newEnt._.alias if alias else preEnt._.alias newEnt._.set('alias', alias) newEnts[-1] = newEnt # The following can not resolve span attributes # with doc.retokenize() as retokenizer: # attrs = { # "tag": newEnt.root.tag, # "dep": newEnt.root.dep, # "ent_type": label, # "ent_id": id, # "_": { # "alias": alias # }, # } # retokenizer.merge(newEnt, attrs=attrs) # newEnts.append(doc[start:start+1]) # print("======>: ", newEnts[0], newEnts[0]._.alias) # if isUpdated: # break doc.ents = filter_spans(list(doc.ents) + newEnts) # if isUpdated: # doc = mergeEntitiesWithSameID(doc) return doc
# Recursive Functions # @Language.component("mergeEntitiesWithSameID") # def mergeEntitiesWithSameID(doc): # """ # Merge the same ID entities # Args: # doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines # Returns: # doc: spacy.tokens.doc.Doc, the document after expansion of current entities # """ # newEnts = [] # isUpdated = False # ents = list(doc.ents) # entID, _ = getEntID() # for i in range(len(ents)-1): # ent1, ent2 = ents[i], ents[i+1] # start = ent1.start # end = ent1.end # label = ent1.label # alias = ent1._.alias # # id = ent1.ent_id # if ent1.ent_id_ == entID and not isUpdated: # if start == 1: # prev = doc[start - 1] # if prev.pos_ in ['NUM']: # start = prev.i # elif start > 1: # prev1, prev2 = doc[start-1], doc[start-2] # if prev1.pos_ in ['NUM']: # start = prev1.i # elif prev1.dep_ in ['punct'] and prev2.pos_ in ['NUM']: # start = prev2.i # if ent2.ent_id_ == entID: # cond = ent1.root.dep_ in ['pobj', 'dobj', 'iobj', 'obj', 'obl', 'oprd'] and ent2.root.dep_ in ['nsubj', 'nsubjpass', 'nsubj:pass'] # if not cond: # if end == ent2.start or (end == ent2.start - 1 and doc[end].dep_ in ['punct']): # end = ent2.end # label = ent2.label # if ent2._.alias: # alias = ent2._.alias # # id = ent2.ent_id # if end == ent1.end: # if end < ent1.sent.end - 1: # post1, post2 = doc[end], doc[end+1] # if post1.pos_ in ['NUM']: # end = end + 1 # elif post1.dep_ in ['punct'] and post2.pos_ in ['NUM']: # end = end + 2 # elif end == ent1.sent.end: # post = doc[end] # if post.pos_ in ['NUM']: # end = end + 1 # if start != ent1.start or end != ent1.end: # isUpdated = True # newEnt = Span(doc, start, end, label=label) # newEnt._.set('alias', alias) # # print(newEnt, newEnt.ent_id_, newEnt.label_, newEnt._.alias) # newEnts.append(newEnt) # # The following can not resolve span attributes # # with doc.retokenize() as retokenizer: # # attrs = { # # "tag": newEnt.root.tag, # # "dep": newEnt.root.dep, # # "ent_type": label, # # "ent_id": id, # # "_": { # # "alias": alias # # }, # # } # # retokenizer.merge(newEnt, attrs=attrs) # # newEnts.append(doc[start:start+1]) # # print("======>: ", newEnts[0], newEnts[0]._.alias) # if isUpdated: # break # doc.ents = filter_spans(list(doc.ents) + newEnts) # if isUpdated: # doc = mergeEntitiesWithSameID(doc) # return doc @Language.component("mergePhrase")
[docs] def mergePhrase(doc): """ Expand the current entities This method will keep ``DET`` or ``PART``, using pipeline ``normEntities`` after this pipeline to remove them Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after merge phrase """ def isNum(nounChunks): for elem in nounChunks: if elem.pos_ == 'NUM': return True, elem break return False, None with doc.retokenize() as retokenizer: for np in doc.noun_chunks: # skip ents since ents are recognized by OPM model and entity_ruler # TODO: we may expand the ents, combined with pipeline "expandEntities" if len(list(np.ents)) > 1: continue elif len(list(np.ents)) == 1: if np.ents[0].label_ not in ['causal_keywords', 'ORG', 'DATE']: # print(np.ents[0].label_) continue # When a number is provided, we will merge it, but keep the attributes from the number num, elem = isNum(np) if not num: attrs = { "tag": np.root.tag_, "lemma": np.root.lemma_, "pos": np.root.pos_, "ent_type": np.root.ent_type_, "_": { "ref_n": np.root._.ref_n, "ref_t": np.root._.ref_t, }, } else: attrs = { "tag": elem.tag_, "lemma": elem.lemma_, "pos":elem.pos_, "ent_type": np.root.ent_type_, "_": { "ref_n": np.root._.ref_n, "ref_t": np.root._.ref_t, }, } retokenizer.merge(np, attrs=attrs) return doc
@Language.component("pysbdSentenceBoundaries")
[docs] def pysbdSentenceBoundaries(doc): """ Use pysbd as a sentencizer component for spacy Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines Returns: doc: spacy.tokens.doc.Doc, the document after process """ seg = pysbd.Segmenter(language="en", clean=False, char_span=True) sentsCharSpans = seg.segment(doc.text) charSpans = [doc.char_span(sentSpan.start, sentSpan.end, alignment_mode="contract") for sentSpan in sentsCharSpans] startTokenIds = [span[0].idx for span in charSpans if span is not None] for token in doc: token.is_sent_start = True if token.idx in startTokenIds else False return doc