Source code for src.dackar.workflows.RuleBasedMatcher

# Copyright 2024, Battelle Energy Alliance, LLC  ALL RIGHTS RESERVED
"""
Created on March, 2022

@author: wangc, mandd
"""
import logging
import pandas as pd
import re
import copy
from spacy.tokens import Token
from spacy.tokens import Span
from collections import deque

from ..config import nlpConfig
from .WorkflowBase import WorkflowBase

[docs] logger = logging.getLogger(__name__)
[docs] class RuleBasedMatcher(WorkflowBase): """ Rule Based Matcher Class """ def __init__(self, nlp, entID='SSC', causalKeywordID='causal', *args, **kwargs): """ Construct Args: nlp: spacy.Language object, contains all components and data needed to process text args: list, positional arguments kwargs: dict, keyword arguments Returns: None """ super().__init__(nlp, entID, causalKeywordID, *args, **kwargs)
[docs] def reset(self): """ Reset rule-based matcher """ super().reset()
[docs] def extractInformation(self): """ Extract information Args: None Returns: None """ ## health status logger.info('Start to extract health status') self.extractHealthStatus(self._matchedSents) ## Access health status and output to an ordered csv file entList = [] hsList = [] svList = [] kwList = [] cjList = [] sentList = [] hsPrependAmod = [] hsPrepend = [] hsAppend = [] hsAppendAmod = [] negList = [] negTextList = [] for sent in self._matchedSents: ents = self.getCustomEnts(sent.ents, self._entityLabels[self._entID]) for ent in ents: if ent._.health_status is not None: entList.append(ent.text) hsList.append(ent._.health_status) svList.append(ent._.ent_status_verb) kwList.append(ent._.hs_keyword) cjList.append(ent._.conjecture) sentList.append(sent.text.strip('\n')) hsPrepend.append(ent._.health_status_prepend) hsPrependAmod.append(ent._.health_status_prepend_amod) hsAppend.append(ent._.health_status_append) hsAppendAmod.append(ent._.health_status_append_amod) negList.append(ent._.neg) negTextList.append(ent._.neg_text) ## include 'root' in the output df = pd.DataFrame({'entities':entList, 'root':svList, 'status keywords':kwList, 'health status':hsList, 'conjecture':cjList, 'sentence':sentList, 'health status prepend': hsPrepend, 'health status prepend adjectival modifier':hsPrependAmod, 'health status append': hsAppend, 'health status append adjectival modifier': hsAppendAmod, 'negation':negList, 'negation text': negTextList}) df.to_csv(nlpConfig['files']['output_health_status_file'], columns=['entities', 'conjecture', 'negation', 'negation text', 'root','status keywords', 'health status prepend adjectival modifier', 'health status prepend', 'health status', 'health status append adjectival modifier', 'health status append', 'sentence']) self._entHS = df # df = pd.DataFrame({'entities':entList, 'status keywords':kwList, 'health status':hsList, 'conjecture':cjList, 'sentence':sentList}) # df.to_csv(nlpConfig['files']['output_health_status_file'], columns=['entities', 'status keywords', 'health statuses', 'conjecture', 'sentence']) for sent in self._matchedSents: ents = self.getCustomEnts(sent.ents, self._entityLabels[self._entID]) for ent in ents: if ent._.status is not None: entList.append(ent.text) hsList.append(ent._.status) svList.append(ent._.ent_status_verb) cjList.append(ent._.conjecture) sentList.append(sent.text.strip('\n')) hsPrepend.append(ent._.status_prepend) hsPrependAmod.append(ent._.status_prepend_amod) hsAppend.append(ent._.status_append) hsAppendAmod.append(ent._.status_append_amod) negList.append(ent._.neg) negTextList.append(ent._.neg_text) ## include 'root' in the output dfStatus = pd.DataFrame({'entities':entList, 'status keywords':svList, 'status':hsList, 'conjecture':cjList, 'sentence':sentList, 'status prepend': hsPrepend, 'status prepend adjectival modifier':hsPrependAmod, 'status append': hsAppend, 'status append adjectival modifier': hsAppendAmod, 'negation':negList, 'negation text': negTextList}) # df.to_csv(nlpConfig['files']['output_status_file'], columns=['entities', 'conjecture', 'negation', 'negation text', 'status keyword', 'status prepend adjectival modifier', 'status prepend', 'status', 'status append adjectival modifier', 'status append', 'sentence']) self._entStatus = dfStatus logger.info('End of health status extraction!') ## causal relation logger.info('Start to extract causal relation using OPM model information') self.extractRelDep(self._matchedSents) dfCausals = pd.DataFrame(self._extractedCausals, columns=self._causalNames) dfCausals.to_csv(nlpConfig['files']['output_causal_effect_file'], columns=self._causalNames) logger.info('End of causal relation extraction!') ## print extracted relation # logger.info('Start to use general extraction method to extract causal relation') # print(*self.extract(self._matchedSents, predSynonyms=self._causalKeywords['VERB'], exclPrepos=[]), sep='\n') # logger.info('End of causal relation extraction using general extraction method!') # collect general cause effect info logger.info('Start to use general extraction method to extract causal relation') matchedCauseEffectSents = self.collectCauseEffectSents(self._doc) extractedCauseEffects = self.extract(matchedCauseEffectSents, predSynonyms=self._causalKeywords['VERB'], exclPrepos=[]) print(*extractedCauseEffects) logger.info('End of causal relation extraction using general extraction method!')
[docs] def extractHealthStatus(self, matchedSents, predSynonyms=[], exclPrepos=[]): """ Extract health status and relation Args: matchedSents: list, the matched sentences predSynonyms: list, predicate synonyms exclPrepos: list, exclude the prepositions """ # first search degradation keywords, # if pobj, then if head.head # Ex. 1: an acrid odor in the control room --> entRoot.dep_ in ['pobj'], HS: entRoot.head.head i.e., # let i = entRoot.head.head.i, start = sent.start, nlefts = entRoot.head.head.n_lefts # healthStatus = sent[i-start-nlefts:i-start+1] # or entRoot.head in 'amod', checking the n_lefts, as above # Ex. 2: shaft degradation --> entRoot.dep_ in ['compound'], HS: entRoot.head and entRoot.head.pos_ in in ['NUM'] # Ex. 3: entRoot.dep_ in ['nsubj', 'nsubjpass'] # sent.root before ent, search left for nsubj or nsubjpass and any 'amod', 'compound', 'det' # sent.root after ent, search right for pobj, and check head.head.dep_ in ['dobj', 'nsubjpass', 'nsubj'], amend it with any 'amod', 'compound', 'det' in its lefts # should report both dobj and pobj for the health status # if sent.root, check right for 'cc' and 'conj', if next is 'cc', return the root # if entRoot.dep_ in ['conj'], if entRoot.head.dep_ in ['nmod'], return entRoot.head.head # entRoot.dep_ in ['dobj'] and entRoot.head.pos_ in ['VERB'], return the entRoot.head # dobj: left children (amod, compound, det) and right childrend, (have signs of overheating) # nsubj -- VERB -- pobj : if pobj is NUM, then everything between VERB and pobj predSynonyms = self._statusKeywords['VERB'] statusNoun = self._statusKeywords['NOUN'] statusAdj = self._statusKeywords['ADJ'] causalStatus = False for sent in matchedSents: valid = self.validSent(sent) causalEnts = None if self._causalKeywordID in self._entityLabels: causalEnts = self.getCustomEnts(sent.ents, self._entityLabels[self._causalKeywordID]) ents = self.getCustomEnts(sent.ents, self._entityLabels[self._entID]) if ents is None: continue causalStatus = [sent.root.lemma_.lower()] in self._causalKeywords['VERB'] and [sent.root.lemma_.lower()] not in self._statusKeywords['VERB'] for ent in ents: healthStatus = None # store health status for identified entities healthStatusAmod = None # store amod for health status healthStatusAppend = None # store some append info for health status (used for short phrase) healthStatusAppendAmod = None # store amod info for health status append info healthStatusPrepend = None # store some prepend info healthStatusPrependAmod = None # store amod info for prepend info healthStatusText = None conjecture = False passive = False entRoot = ent.root root = sent.root neg = False if valid: if entRoot.dep_ in ['nsubj', 'nsubjpass']: healthStatus, neg, negText = self.getHealthStatusForSubj(ent, ent, sent, causalStatus, predSynonyms) elif entRoot.dep_ in ['pobj', 'dobj']: if len(ents) == 1 or entRoot.dep_ in ['dobj']: healthStatus, neg, negText = self.getHealthStatusForObj(ent, ent, sent, causalStatus, predSynonyms) if entRoot.dep_ in ['pobj']: # extract append info for health status prep = entRoot.head healthStatusAppendAmod = self.getCompoundOnly(ent, ent) if len(healthStatusAppendAmod) > 0: healthStatusAppendAmod = [prep.text] + healthStatusAppendAmod healthStatusAppend = ent else: healthStatus = self.getHealthStatusForPobj(ent, include=False) if healthStatus is None: head = entRoot.head if head.dep_ in ['xcomp', 'advcl', 'relcl']: for child in head.rights: if child.dep_ in ['ccomp']: healthStatus = child break elif entRoot.dep_ in ['compound']: head = entRoot.head if head.pos_ not in ['SPACE', 'PUNCT']: if len(ents) == 1: if head.dep_ in ['compound']: head = head.head headEnt = head.doc[head.i:head.i+1] if head.dep_ in ['nsubj', 'nsubjpass']: healthStatus, neg, negText = self.getHealthStatusForSubj(headEnt, ent, sent, causalStatus, predSynonyms, include=True) if isinstance(healthStatus, Span): if entRoot.i >= healthStatus.start and entRoot.i < healthStatus.end: healthStatus = headEnt if healthStatus is not None: healthStatusPrepend = headEnt healthStatusPrependAmod = self.getAmodOnly(headEnt) elif head.dep_ in ['dobj', 'pobj']: healthStatus, neg, negText = self.getHealthStatusForObj(headEnt, ent, sent, causalStatus, predSynonyms, include=False) if healthStatus is not None: if isinstance(healthStatus, Span): if head not in healthStatus: # identify the dobj/pobj, and use it as append info healthStatusAppend = headEnt healthStatusAppendAmod = self.getAmodOnly(headEnt) elif isinstance(healthStatus, Token): if head != healthStatus: # identify the dobj/pobj, and use it as append info healthStatusAppend = headEnt healthStatusAppendAmod = self.getAmodOnly(headEnt) if healthStatus is None: healthStatus = headEnt else: healthStatus = entRoot.head healthStatusAmod = self.getAmodOnly(healthStatus) if len(healthStatusAmod) == 0: lefts = list(healthStatus.lefts) # remove entity itself for elem in lefts: if elem in ent: lefts.remove(elem) if len(lefts) != 0: healthStatusAmod = [e.text for e in lefts] if head.dep_ in ['dobj','pobj','nsubj', 'nsubjpass'] and [root.lemma_.lower()] in predSynonyms: ent._.set('hs_keyword', root.lemma_) else: healthStatus = self.getAmod(ent, ent.start, ent.end, include=False) elif entRoot.dep_ in ['conj']: # TODO: recursive function to retrieve non-conj healthStatus = self.getAmod(ent, ent.start, ent.end, include=False) if healthStatus is None: head = entRoot.head if head.dep_ in ['conj']: head = head.head headEnt = head.doc[head.i:head.i+1] if head.dep_ in ['nsubj', 'nsubjpass']: healthStatus, neg, negText = self.getHealthStatusForSubj(headEnt, ent, sent, causalStatus, predSynonyms) elif head.dep_ in ['pobj', 'dobj']: healthStatus = self.getHealthStatusForPobj(headEnt, include=False) if healthStatus is None: healthStatus, neg, negText = self.getHealthStatusForObj(headEnt, ent, sent, causalStatus, predSynonyms) elif entRoot.dep_ in ['ROOT']: healthStatus = self.getAmod(ent, ent.start, ent.end, include=False) if healthStatus is None: rights =[tk for tk in list(entRoot.rights) if tk.pos_ in ['VERB', 'NOUN', 'ADJ', 'ADV'] and tk.i >= ent.end] if len(rights) > 0: healthStatus = rights[0] else: logger.warning(f'Entity "{ent}" dep_ is "{entRoot.dep_}" is not among valid list "[nsubj, nsubjpass, pobj, dobj, compound]"') if entRoot.head == root: headEnt = root.doc[root.i:root.i+1] if ent.start < root.i: if [root.lemma_.lower()] in predSynonyms: ent._.set('hs_keyword', root.lemma_) else: ent._.set('ent_status_verb', root.lemma_) neg, negText = self.isNegation(root) passive = self.isPassive(root) # # last is punct, the one before last is the root # if root.nbor().pos_ in ['PUNCT']: # healthStatus = root healthStatus = self.findRightObj(root) if healthStatus and healthStatus.dep_ == 'pobj': healthStatus = self.getHealthStatusForPobj(healthStatus, include=True) elif healthStatus and healthStatus.dep_ == 'dobj': subtree = list(healthStatus.subtree) nbor = self.getNbor(healthStatus) if nbor is not None and nbor.dep_ in ['prep']: healthStatus = healthStatus.doc[healthStatus.i:subtree[-1].i+1] # no object is found if not healthStatus: healthStatus = self.findRightKeyword(root) # last is punct, the one before last is the root nbor = self.getNbor(root) if not healthStatus and nbor is not None and nbor.pos_ in ['PUNCT']: healthStatus = root if healthStatus is None: healthStatus = self.getAmod(ent, ent.start, ent.end, include=False) if healthStatus is None: healthStatus = root else: if [root.lemma_.lower()] in predSynonyms: ent._.set('hs_keyword', root.lemma_) else: ent._.set('ent_status_verb', root.lemma_) passive = self.isPassive(root) neg, negText = self.isNegation(root) healthStatus = self.findLeftSubj(root, passive) if healthStatus is not None: healthStatus = self.getAmod(healthStatus, healthStatus.i, healthStatus.i+1, include=True) else: # handle short phrase healthStatus = self.getAmod(ent, ent.start, ent.end, include=False) # search right start = None end = None if not ent[-1].is_sent_end and ent[-1].dep_ in ['compound']: start = ent.end end = None for i, tk in enumerate(sent.doc[ent.end:sent[-1].i+1]): # assume SPACE got removed already if tk.pos_ in ['PUNCT']: break if tk == sent.doc[tk.i-1].head: end = tk.i+1 else: break if end is not None: healthStatusAppend = sent.doc[start:end] else: if ent[-1].head != ent[-1]: ind = ent[-1].head.i healthStatusAppend = sent.doc[ind:ind+1] if healthStatusAppend is not None: healthStatusAppendAmod = self.getAmodOnly(healthStatusAppend) if not healthStatusAppendAmod: healthStatusAppendAmod = self.getCompoundOnly(healthStatusAppend, ent) for elem in healthStatusAppendAmod: if elem in ent.text: healthStatusAppendAmod.remove(elem) if healthStatus is None: healthStatus = healthStatusAppend healthStatusAmod = healthStatusAppendAmod # reset healthStatusAppend = None healthStatusAppendAmod = None # handle conjuncts if healthStatus is None and len(ent.conjuncts) > 0: conjunct = sent.doc[ent.conjuncts[0].i: ent.conjuncts[0].i+1] healthStatus = self.getAmod(conjunct, conjunct.start, conjunct.end, include=False) if healthStatus is None: ent._.set('health_status',conjunct._.health_status) ent._.set('status',conjunct._.status) ent._.set('hs_keyword',conjunct._.hs_keyword) ent._.set('ent_status_verb',conjunct._.ent_status_verb) ent._.set('conjecture',conjunct._.conjecture) if healthStatus is None: continue _healthStatus = False if isinstance(healthStatus, Span): conjecture = self.isConjecture(healthStatus.root.head) if healthStatus.root.lemma_ in statusNoun + statusAdj: _healthStatus = True elif isinstance(healthStatus, Token): conjecture = self.isConjecture(healthStatus.head) if healthStatus.lemma_ in statusNoun + statusAdj: _healthStatus = True if not neg: if isinstance(healthStatus, Span): neg, negText = self.isNegation(healthStatus.root) else: neg, negText = self.isNegation(healthStatus) # conjecture = self.isConjecture(healthStatus.head) # neg, negText = self.isNegation(healthStatus) ent._.set('neg',neg) ent._.set('neg_text',negText) ent._.set('conjecture',conjecture) if _healthStatus: ent._.set('health_status',healthStatus) ent._.set('health_status_prepend', healthStatusPrepend) ent._.set('health_status_prepend_amod',healthStatusPrependAmod) ent._.set('health_status_amod',healthStatusAmod) ent._.set('health_status_append',healthStatusAppend) ent._.set('health_status_append_amod',healthStatusAppendAmod) else: ent._.set('status',healthStatus) ent._.set('status_prepend', healthStatusPrepend) ent._.set('status_prepend_amod',healthStatusPrependAmod) ent._.set('status_amod',healthStatusAmod) ent._.set('status_append',healthStatusAppend) ent._.set('status_append_amod',healthStatusAppendAmod) prependAmodText = ' '.join(healthStatusPrependAmod) if healthStatusPrependAmod is not None else '' prependText = healthStatusPrepend.text if healthStatusPrepend is not None else '' amodText = ' '.join(healthStatusAmod) if healthStatusAmod is not None else '' appendAmodText = ' '.join(healthStatusAppendAmod) if healthStatusAppendAmod is not None else '' if healthStatusAppend is not None and healthStatusAppend != ent: appText = healthStatusAppend.root.head.text + ' ' + healthStatusAppend.text if healthStatusAppend.root.dep_ in ['pobj'] else healthStatusAppend.text else: appText = '' healthStatusText = ' '.join(list(filter(None, [prependAmodText, prependText, amodText, healthStatus.text, appendAmodText,appText]))).strip() if neg: healthStatusText = ' '.join([negText,healthStatusText]) if isinstance(healthStatus, Span): if ent.start > healthStatus.start and ent.end < healthStatus.end: # remove entity info in healthStatusText pn = re.compile(rf'{ent.text}\w*') healthStatusText = re.sub(pn, '', healthStatusText).strip() # healthStatusText = healthStatusText.replace(ent.text, '') logger.debug(f'{ent} health status: {healthStatusText}')
# ent._.set('health_status', healthStatusText) # ent._.set('conjecture',conjecture)
[docs] def findHealthStatus(self, root, deps): """ Return first child of root (included) that matches dependency list by breadth first search. Search stops after first dependency match if firstDepOnly (used for subject search - do not "jump" over subjects) Args: root: spacy.tokens.Token, the root token deps: list, the dependency list Returns: child: token, the token represents the health status """ toVisit = deque([root]) # queue for bfs while len(toVisit) > 0: child = toVisit.popleft() # print("child", child, child.dep_) if child.dep_ in deps: # to handle preposition try: nbor = child.nbor() except IndexError: pass # ignore for now # TODO, what else need to be added # can not use the first check only, since is nbor is 'during', it will also satisfy the check condition # if (nbor.dep_ in ['prep'] and nbor.lemma_.lower() in ['of', 'in']) or nbor.pos_ in ['VERB']: # return self.findRightObj(nbor, deps=['pobj']) return child elif child.dep_ == 'compound' and \ child.head.dep_ in deps: # check if contained in compound return child toVisit.extend(list(child.children)) return None
[docs] def isValidCausalEnts(self, ent): """ Check the entity if it belongs to the valid causal entities Args: ent: list, list of entities Returns: valid: bool, valid cansual ent if True """ valid = False validDep = ['nsubj', 'nsubjpass', 'nsubj:pass', 'pobj', 'dobj', 'iobj', 'obj', 'obl', 'oprd'] for e in ent: root = e.root if root.dep_ in validDep or root.head.dep_ in validDep: valid = True break return valid
[docs] def getSSCEnt(self, entList, index, direction='left'): """ Get the closest group of SSC entities Args: entList: list, list of entities index: int, the start location of entity direction: str, 'left' or 'right', the search direction Returns: ent: the closest group of SSC entities """ ent = None if direction.lower() == 'left': for i in range(index, -1, -1): ent = entList[i] if isinstance(ent, list): # we may check the ent label here return ent elif direction.lower() == 'right': maxInd = len(entList) for i in range(index, maxInd): ent = entList[i] if isinstance(ent, list): # we may check the ent label here return ent return ent
[docs] def extractRelDep(self, matchedSents): """ Args: matchedSents: list, the list of matched sentences Returns: (subject tuple, predicate, object tuple): generator, the extracted causal relation """ allCauseEffectPairs = [] for sent in matchedSents: if self._causalKeywordID in self._entityLabels: causalEnts = self.getCustomEnts(sent.ents, self._entityLabels[self._causalKeywordID]) else: continue sscEnts = self.getCustomEnts(sent.ents, self._entityLabels[self._entID]) sscEnts = self.getConjuncts(sscEnts) logger.debug(f'Conjuncts pairs: {sscEnts}') if causalEnts is None: # no causal keyword is found, skipping continue if len(sscEnts) == 0: logger.debug(f'No entity is identified in "{sent.text}"') self._causalSentsNoEnts.append(sent) continue if len(sscEnts) == 1: logger.debug(f'Entity "({sent.ents})" is identified in "{sent.text}"') self._causalSentsOneEnt.append(sent) continue # shows keywords can be used to identify the causal sentences, but there are some false positive cases logger.debug(f'Sentence contains causal keywords: {causalEnts}. \n {sent.text}') # grab all ents labelList = self._entityLabels[self._causalKeywordID].union(self._entityLabels[self._entID]) ents = self.getCustomEnts(sent.ents, labelList) mEnts = copy.copy(ents) root = sent.root i = root.i neg, negText = self.isNegation(root) conjecture = self.isConjecture(root) if neg: if conjecture: rootTuple = [('conjecture', conjecture), ('negation', neg), ('negation text',negText), root] else: rootTuple = [('negation', neg), ('negation text',negText), root] else: if conjecture: rootTuple = [('conjecture', conjecture), root] else: rootTuple = [root] idx = -1 for j, ent in enumerate(ents): start = ent.start if i < start: idx = j break if i == start: rootTuple[-1] = ent mEnts[j] = rootTuple break if idx != -1: mEnts.insert(j, rootTuple) logger.debug(f'Causal Info: {mEnts}') self._rawCausalList.append(mEnts) entTuples = [(ent[0].start, ent) for ent in sscEnts] + [(ent.start, ent) for ent in causalEnts] orderedEnts = sorted(entTuples, key = lambda x:x[0]) orderedEnts = [ent[1] for ent in orderedEnts] # Loop over causal keywords, make functions, for each of [verb, noun, transition] # Define rules for each functions causeEffectPair = [] skipCEnts = [] rootCause = None for i, cEnt in enumerate(causalEnts): if cEnt in skipCEnts: continue cRoot = cEnt.root cRootLoc = cRoot.i causalEntLemma = [token.lemma_.lower() for token in cEnt if token.lemma_ != "DET"] rightSSCEnts = self.getRightSSCEnts(cEnt, orderedEnts) leftSSCEnts = self.getLeftSSCEnts(cEnt, orderedEnts) validLeftSSCEnts = self.selectValidEnts(leftSSCEnts, cEnt) validRightSSCEnts = self.selectValidEnts(rightSSCEnts, cEnt) # initial assignment causeList = validLeftSSCEnts effectList = validRightSSCEnts if validLeftSSCEnts is None and validRightSSCEnts is None: logger.debug(f'No causal/effect entities exist in "{sent}"') continue if cRoot.pos_ == 'VERB' and cRoot == sent.root: passive = self.isPassive(root) conjecture = self.isConjecture(cRoot) if causeList is None: subj = self.findSubj(cRoot, passive) if subj is not None: causeList = [[subj]] if effectList is None: obj = self.findObj(cRoot) if obj is not None: effectList = [[obj]] if passive: causeList, effectList = effectList, causeList rootCause = (causeList, effectList, conjecture) elif cRoot.pos_ == 'VERB' and cRoot != sent.root: conjecture = self.isConjecture(cRoot) if validRightSSCEnts is None: continue causeList, effectList = self.identifyCauseEffectForClauseModifier(cRoot, rootCause, validLeftSSCEnts, validRightSSCEnts) elif cRoot.pos_ == 'NOUN': if causalEntLemma in self._causalKeywords['causal-noun']: cRootHead = cRoot.head conjecture = self.isConjecture(cRootHead) if validRightSSCEnts is None: continue if cRootHead.dep_ in ['xcomp', 'advcl', 'relcl']: causeList, effectList = self.identifyCauseEffectForClauseModifier(cRootHead, rootCause, validLeftSSCEnts, validRightSSCEnts) elif cRoot.dep_ in ['attr']: causeList, effectList = self.identifyCauseEffectForAttr(self, cRootHead, validLeftSSCEnts, validRightSSCEnts) if rootCause is None: rootCause = (causeList, effectList, conjecture) elif cRoot.dep_ in ['nsubj']: causeList, effectList, skip = self.identifyCauseEffectForNsuj(cRoot, i, causalEnts, orderedEnts, validRightSSCEnts, reverse=True) skipCEnts.extend(skip) if rootCause is None: rootCause = (causeList, effectList, conjecture) else: continue elif causalEntLemma in self._causalKeywords['effect-noun']: cRootHead = cRoot.head conjecture = self.isConjecture(cRootHead) if validRightSSCEnts is None: continue if cRootHead.dep_ in ['xcomp', 'advcl', 'relcl']: causeList, effectList = self.identifyCauseEffectForClauseModifier(cRootHead, rootCause, validLeftSSCEnts, validRightSSCEnts, reverse=True) elif cRoot.dep_ in ['attr']: causeList, effectList = self.identifyCauseEffectForAttr(self, cRootHead, validLeftSSCEnts, validRightSSCEnts, reverse=True) if rootCause is None: rootCause = (causeList, effectList, conjecture) elif cRoot.dep_ in ['nsubj']: causeList, effectList, skip = self.identifyCauseEffectForNsuj(cRoot, i, causalEnts, orderedEnts, validRightSSCEnts, reverse=False) skipCEnts.extend(skip) if rootCause is None: rootCause = (causeList, effectList, conjecture) else: continue elif causalEntLemma in self._causalKeywords['causal-relator']: if causeList is None: causeList = rootCause elif causalEntLemma in self._causalKeywords['effect-relator']: if validLeftSSCEnts is not None and validRightSSCEnts is not None: causeList, effectList = effectList, causeList elif validLeftSSCEnts is None and validRightSSCEnts is not None: causeList, effectList, skip = self.identifyCauseEffectForNsuj(cRoot, i, causalEnts, orderedEnts, validRightSSCEnts, reverse=False) skipCEnts.extend(skip) if rootCause is None: rootCause = (causeList, effectList, conjecture) else: continue if causeList is None or effectList is None: logger.warning(f"Issue found: 'cause list': {causeList}, and 'effect list': {effectList} were identified in sentence '{sent}'") continue if causeList is None or effectList is None: continue causeEffectPair.append((causeList, effectList, conjecture)) if isinstance(causeList, tuple): self.collectExtactedCausals(causeList[0], effectList, cEnt, sent, conjecture) # effect can be cause for next effect self.collectExtactedCausals(causeList[1], effectList, cEnt, sent, conjecture) elif isinstance(effectList, tuple): self.collectExtactedCausals(causeList, effectList[0], cEnt, sent, conjecture) self.collectExtactedCausals(causeList, effectList[1], cEnt, sent, conjecture) else: self.collectExtactedCausals(causeList, effectList, cEnt, sent, conjecture) if len(causeEffectPair) != 0: allCauseEffectPairs.append(causeEffectPair)
# print("Identified Cause-Effect Pairs:") # for elem in allCauseEffectPairs: # for i in elem: # print(i)
[docs] def identifyCauseEffectForNsuj(self, cRoot, cEntsIndex, causalEnts, orderedEnts, validRightSSCEnts, reverse=False): """ Identify the cause effect pairs for nsubj Args: cRoot: Token, the root of causal entity cEntsIndex: int, the index for the causal entity causalEnts: list, the list of causal entities orderedEnts: list, the entities ordered by their locations in the Doc validRightSSCEnts: list, the valid list of entities on the right of given causal entity reverse: bool, reverse the cause effect relation if True Returns: cause effect pairs, tuple, (causeList, effectList, skipCEnts) """ causeList = None effectList = None skipCEnts = [] if reverse: cKey = 'effect-relator' else: cKey = 'causal-relator' causeList, effectList = self.splitEntsFollowingNounCausal(cRoot, validRightSSCEnts) if causeList is None: obj = self.findObj(cRoot) if obj is not None: causeList = [[obj]] if effectList is None: if cEntsIndex < len(causalEnts) - 1: nextCEnt = causalEnts[cEntsIndex+1] nextCEntLemma = [token.lemma_.lower() for token in nextCEnt if token.lemma_ != "DET"] if nextCEntLemma in self._causalKeywords[cKey]: ents = self.getRightSSCEnts(nextCEnt, orderedEnts) validEnts = self.selectValidEnts(ents, nextCEnt) if validEnts is not None: effectList = validEnts else: obj = self.findObj(nextCEnt.root) if obj is not None: effectList = [[obj]] skipCEnts.append(nextCEnt) else: obj = self.findObj(cRoot.head) if obj is not None: effectList = [[obj]] if reverse: causeList, effectList = effectList, causeList return causeList, effectList, skipCEnts
[docs] def identifyCauseEffectForAttr(self, cRoot, validLeftSSCEnts, validRightSSCEnts, reverse=False): """ Identify the cause effect pairs for attr Args: cRoot: Token, the root of causal entity validLeftSSCEnts: list, the valid list of entities on the left of given causal entity validRightSSCEnts: list, the valid list of entities on the right of given causal entity reverse: bool, reverse the cause effect relation if True Returns: cause effect pairs, tuple, (causeList, effectList) """ causeList = None effectList = None if validLeftSSCEnts is not None: causeList = validLeftSSCEnts effectList = validRightSSCEnts else: passive = self.isPassive(cRoot) subj = self.findSubj(cRoot, passive) if subj is not None: causeList = [[subj]] effectList = validRightSSCEnts if reverse: return effectList, causeList else: return causeList, effectList
[docs] def identifyCauseEffectForClauseModifier(self, cRoot, rootCause, validLeftSSCEnts, validRightSSCEnts, reverse=False): """ Identify the cause effect pairs for clause modifier Args: cRoot: Token, the root of causal entity rootCause: tuple, list of causes validLeftSSCEnts: list, the valid list of entities on the left of given causal entity validRightSSCEnts: list, the valid list of entities on the right of given causal entity reverse: bool, reverse the cause effect relation if True Returns: cause effect pairs, tuple, (causeList, effectList) """ causeList = None effectList = None # xcomp: open clausal complement i.e., rendering ..., causing ... # advcl: adverbial clause modifier i.e., ... which disabled ... # relcl: relative clause modifier i.e. ..., which disabled ... if cRoot.dep_ not in ['xcomp', 'advcl', 'relcl']: return causeList, effectList if rootCause is not None: # using rootCause as the cause causeList = rootCause effectList = validRightSSCEnts else: if validLeftSSCEnts is not None: causeList = validLeftSSCEnts effectList = validRightSSCEnts else: head = cRoot.head passive = self.isPassive(head) subj = self.findSubj(head, passive) if subj is not None: causeList = [[subj]] effectList = validRightSSCEnts if reverse: return effectList, causeList else: return causeList, effectList
[docs] def splitEntsFollowingNounCausal(self, cRoot, validRightSSCEnts): """ Spit the entities into cause, effect Args: cRoot: Token, the root of causal entity validRightSSCEnts: list, the valid list of entities on the right of given causal entity Returns: cause effect pairs, tuple, (cause, effect) """ cause = [] effect = [] if validRightSSCEnts is None: return None, None for ents in validRightSSCEnts: root = ents[0].root if root in cRoot.subtree: cause.append(ents) elif root in cRoot.head.subtree: effect.append(ents) if len(cause) == 0: cause = None if len(effect) == 0: effect = None return cause, effect
[docs] def getRightSSCEnts(self, cEnt, orderedEnts): """ Get the SSC ents on the right of causal entity Args: cEnt: Span, causal entity orderedEnts: list, the entities ordered by their locations in the Doc Returns: selEnts: list, list of SSC entities """ cIdx = self.getIndex(cEnt, orderedEnts) maxInd = len(orderedEnts)-1 selEnts = [] if cIdx == maxInd: return None for i in range(cIdx+1, maxInd+1): if isinstance(orderedEnts[i], list): selEnts.append(orderedEnts[i]) else: break if len(selEnts) != 0: return selEnts else: return None
[docs] def getLeftSSCEnts(self, cEnt, orderedEnts): """ Get the SSC ents on the left of causal entity Args: cEnt: Span, causal entity orderedEnts: list, the entities ordered by their locations in the Doc Returns: selEnts: list, list of SSC entities """ cIdx = self.getIndex(cEnt, orderedEnts) maxInd = len(orderedEnts)-1 selEnts = [] if cIdx == 0: return None for i in range(cIdx-1, -1, -1): if isinstance(orderedEnts[i], list): selEnts.append(orderedEnts[i]) else: break if len(selEnts) != 0: return selEnts else: return None
[docs] def selectValidEnts(self, ents, cEnt): """ Select the valide ents that are within subtree of causal entity Args: ents: list, the list of entities cEnt: Span, causal entity Returns: validEnts: list, list of valid entities """ if ents is None: return None validEnts = [] for ent in ents: valid = self.isValidCausalEnts(ent) if not valid: continue root = ent[0].root if root in cEnt.root.subtree or root in cEnt.root.head.subtree: validEnts.append(ent) if len(validEnts) == 0: return None else: return validEnts
[docs] def collectExtactedCausals(self, cause, effect, causalKeyword, sent, conjecture=None): """ Collect the extracted causal relations Args: cause: list, list of causes effect: list, list of effects causalKeyword: str, causal keyword sent: spacy.tokens.Span, sentence with identified causal relations Returns: None """ root = sent.root if conjecture is None: conjecture = self.isConjecture(root) for csub in cause: for c in csub: for esub in effect: for e in esub: logger.debug(f'({c} health status: {c._.health_status}) "{causalKeyword}" ({e} health status: {e._.health_status}), conjecture: "{conjecture}"') self._extractedCausals.append([c, c._.health_status, causalKeyword, e, e._.health_status, sent, conjecture])
[docs] def collectCauseEffectSents(self, doc): """ Collect data of matched sentences that contain cause-effect keywords Args: doc: spacy.tokens.doc.Doc, the processed document using nlp pipelines """ matchedSents = [] for sent in doc.sents: for ent in sent.ents: if ent.ent_id_ != self._causalKeywordID: continue if sent not in matchedSents: matchedSents.append(sent) return matchedSents
[docs] def getHealthStatusForPobj(self, ent, include=False): """Get the status for ent root pos ``pobj`` Args: ent: Span, the span of entity include: bool, ent will be included in returned status if True returns: Span or Token, the identified health status """ healthStatus = None if isinstance(ent, Token): root = ent start = root.i end = start + 1 elif isinstance(ent, Span): root = ent.root start = ent.start end = ent.end if root.dep_ not in ['pobj']: return healthStatus grandparent = root.head.head parent = root.head causalStatus = [grandparent.lemma_.lower()] in self._causalKeywords['VERB'] and [grandparent.lemma_.lower()] not in self._statusKeywords['VERB'] if grandparent.dep_ in ['dobj', 'nsubj', 'nsubjpass', 'pobj']: lefts = list(grandparent.lefts) if len(lefts) == 0: leftInd = grandparent.i else: leftInd = lefts[0].i if not include: rights = list(grandparent.rights) if grandparent.n_rights > 1 and rights[-1] == parent: healthStatus = grandparent.doc[leftInd:rights[-1].i] else: healthStatus = grandparent.doc[leftInd:grandparent.i+1] else: healthStatus = grandparent.doc[leftInd:end] healthStatus = self.getAmod(healthStatus, healthStatus.start, healthStatus.end, include=True) elif grandparent.pos_ in ['VERB'] and causalStatus: healthStatus = self.findRightObj(grandparent) subtree = list(healthStatus.subtree) nbor = self.getNbor(healthStatus) if healthStatus is not None and nbor is not None and nbor.dep_ in ['prep'] and subtree[-1].i < root.i: healthStatus = grandparent.doc[healthStatus.i:subtree[-1].i+1] elif healthStatus is not None and healthStatus.i >= root.i: healthStatus = None elif grandparent.pos_ in ['VERB'] and grandparent.dep_ in ['ROOT']: dobj = [tk for tk in grandparent.rights if tk.dep_ in ['dobj'] and tk.i < start] if len(dobj) > 0: dobjEnt = root.doc[dobj[0].i:dobj[0].i+1] healthStatus = self.getAmod(dobjEnt, dobjEnt.start, dobjEnt.end, include=True) else: healthStatus = ent healthStatus = self.getAmod(ent, start, end, include=include) elif grandparent.pos_ in ['NOUN']: grandEnt = grandparent.doc[grandparent.i:grandparent.i+1] healthStatus = self.getAmod(grandEnt, grandparent.i, grandparent.i+1, include=True) elif grandparent.pos_ in ['AUX']: healthStatus = grandparent.doc[grandparent.i+1:parent.i] else: # search lefts for amod healthStatus = self.getAmod(ent, start, end, include) return healthStatus
[docs] def getHealthStatusForSubj(self, ent, entHS, sent, causalStatus, predSynonyms, include=False): """ Get the status for nsubj/nsubjpass ent Args: ent: Span, the nsubj/nsubjpass ent that will be used to search status entHS: Span, the entHS that the status will be associated with sent: Span, the sent that includes the ent, entHS and status causalStatus: bool, the causal status for the ent predSynonyms: list, predicate synonyms include: bool, include ent in the returned expression if True Returns: healthStatus: Span or Token, the identified status """ healthStatus = None neg = False negText = '' entRoot = ent.root # root = sent.root root = entRoot.head causalStatus = [root.lemma_.lower()] in self._causalKeywords['VERB'] and [root.lemma_.lower()] not in self._statusKeywords['VERB'] if entRoot.dep_ not in ['nsubj', 'nsubjpass']: return healthStatus, neg, negText if root.pos_ != 'VERB': neg, negText = self.isNegation(root) if root.pos_ in ['NOUN', 'ADJ']: # TODO: search entRoot.lefts for 'amod' and attach to healthStatus healthStatus = root elif root.pos_ in ['AUX']: healthStatus = root.doc[root.i+1:root.i+root.n_rights+1] else: logger.warning(f'No status identified for "{ent}" in "{sent}"') else: rights = root.rights valid = [tk.dep_ in ['advcl', 'relcl'] for tk in rights if tk.pos_ not in ['PUNCT', 'SPACE']] nbor = self.getNbor(root) if nbor is not None and (nbor.dep_ in ['cc'] or nbor.pos_ in ['PUNCT']): healthStatus = root elif len(valid)>0 and all(valid): healthStatus = root elif not causalStatus: if [root.lemma_.lower()] in predSynonyms: entHS._.set('hs_keyword', root.lemma_) else: entHS._.set('ent_status_verb', root.lemma_) neg, negText = self.isNegation(root) passive = self.isPassive(root) # # last is punct, the one before last is the root # if root.nbor().pos_ in ['PUNCT']: # healthStatus = root healthStatus = self.findRightObj(root) if healthStatus and healthStatus.dep_ == 'pobj': healthStatus = self.getHealthStatusForPobj(healthStatus, include=True) elif healthStatus and healthStatus.dep_ == 'dobj': subtree = list(healthStatus.subtree) try: if healthStatus.nbor().dep_ in ['prep']: healthStatus = healthStatus.doc[healthStatus.i:subtree[-1].i+1] except IndexError: pass # no object is found if not healthStatus: healthStatus = self.findRightKeyword(root) # last is punct, the one before last is the root # if not healthStatus and root.nbor().pos_ in ['PUNCT']: # healthStatus = root if healthStatus is None: healthStatus = self.getAmod(ent, ent.start, ent.end, include=include) if healthStatus is None: extra = [tk for tk in root.rights if tk.pos_ in ['ADP', 'ADJ']] # Only select the first ADP and combine with root if len(extra) > 0: healthStatus = root.doc[root.i:extra[0].i+1] else: healthStatus = root else: healthStatus = self.getAmod(ent, ent.start, ent.end, include=include) if healthStatus is None: healthStatus = root return healthStatus, neg, negText
[docs] def getHealthStatusForObj(self, ent, entHS, sent, causalStatus, predSynonyms, include=False): """ Get the status for pobj/dobj ent Args: ent: Span, the pobj/dobj ent that will be used to search status entHS: Span, the entHS that the status will be associated with sent: Span, the sent that includes the ent, entHS and status causalStatus: bool, the causal status for the ent predSynonyms: list, predicate synonyms include: bool, include ent in the returned expression if True Returns: healthStatus: Span or Token, the identified status """ healthStatus = None neg = False negText = '' entRoot = ent.root head = entRoot.head prep = False if head.pos_ in ['VERB']: root = head elif head.dep_ in ['prep']: root = head.head prep = True else: root = head causalStatus = [root.lemma_.lower()] in self._causalKeywords['VERB'] and [root.lemma_.lower()] not in self._statusKeywords['VERB'] if entRoot.dep_ not in ['pobj', 'dobj']: return healthStatus, neg, negText if root.pos_ != 'VERB': neg, negText = self.isNegation(root) if root.pos_ in ['ADJ']: healthStatus = root elif root.pos_ in ['NOUN']: if root.dep_ in ['pobj']: healthStatus = root.doc[root.head.head.i:root.i+1] else: healthStatus = root elif root.pos_ in ['AUX']: leftInd = list(root.lefts)[0].i healthStatus = root.doc[leftInd:root.i] else: logger.warning(f'No status identified for "{ent}" in "{sent}"') else: if not causalStatus: if [root.lemma_.lower()] in predSynonyms: entHS._.set('hs_keyword', root.lemma_) else: entHS._.set('ent_status_verb', root.lemma_) passive = self.isPassive(root) neg, negText = self.isNegation(root) healthStatus = self.findLeftSubj(root, passive) if healthStatus is not None and healthStatus.pos_ in ['PRON']: # coreference resolution passive = self.isPassive(root.head) neg, negText = self.isNegation(root.head) healthStatus = self.findLeftSubj(root.head, passive) if healthStatus is not None: healthStatus = self.getAmod(healthStatus, healthStatus.i, healthStatus.i+1, include=True) else: healthStatus = self.getAmod(ent, ent.start, ent.end, include=include) # healthStatus = self.getCompoundOnly(ent, entHS) if healthStatus is None: rights =[tk for tk in list(root.rights) if tk.pos_ not in ['SPACE', 'PUNCT'] and tk.i >= ent.end] if len(rights) > 0 and rights[0].pos_ in ['VERB', 'NOUN', 'ADJ', 'ADV']: healthStatus = rights[0] else: if entRoot.dep_ in ['pobj']: healthStatus = self.getHealthStatusForPobj(ent, include=include) else: healthStatus = self.getAmod(ent, ent.start, ent.end, include=include) return healthStatus, neg, negText