Source code for src.dackar.workflows.WorkOrderProcessing

# Copyright 2024, Battelle Energy Alliance, LLC  ALL RIGHTS RESERVED
"""
Created on March, 2024

@author: wangc, mandd
"""
import logging
import pandas as pd
import re
from spacy.tokens import Token
from spacy.tokens import Span

from ..text_processing.Preprocessing import Preprocessing
from ..utils.utils import getOnlyWords, getShortAcronym
from ..config import nlpConfig
from .WorkflowBase import WorkflowBase

[docs] logger = logging.getLogger(__name__)
if not Span.has_extension('conjecture'): Span.set_extension('conjecture', default=False) if not Span.has_extension('status'): Span.set_extension("status", default=None) if not Span.has_extension('neg'): Span.set_extension("neg", default=None) if not Span.has_extension('neg_text'): Span.set_extension("neg_text", default=None) if not Span.has_extension('alias'): Span.set_extension("alias", default=None) if not Token.has_extension('conjecture'): Token.set_extension('conjecture', default=False) if not Token.has_extension('status'): Token.set_extension("status", default=None) if not Token.has_extension('neg'): Token.set_extension("neg", default=None) if not Token.has_extension('neg_text'): Token.set_extension("neg_text", default=None) if not Token.has_extension('alias'): Token.set_extension("alias", default=None)
[docs] class WorkOrderProcessing(WorkflowBase): """ Class to process CWS work order dataset """ def __init__(self, nlp, entID='SSC', *args, **kwargs): """ Construct Args: nlp: spacy.Language object, contains all components and data needed to process text args: list, positional arguments kwargs: dict, keyword arguments Returns: None """ super().__init__(nlp, entID, causalKeywordID='causal', *args, **kwargs)
[docs] self._allRelPairs = []
[docs] self._relationNames = ['Subj_Entity', 'Relation', 'Obj_Entity']
[docs] def reset(self): """ Reset rule-based matcher """ super().reset() self._allRelPairs = [] self._entStatus = None
[docs] def addKeywords(self, keywords, ktype): """ Method to update self._causalKeywords or self._statusKeywords Args: keywords: dict, keywords that will be add to self._causalKeywords or self._statusKeywords ktype: string, either 'status' or 'causal' """ if type(keywords) != dict: raise IOError('"addCausalKeywords" method can only accept dictionary, but got {}'.format(type(keywords))) if ktype.lower() == 'status': for key, val in keywords.items(): if type(val) != list: val = [val] val = self.extractLemma(val) if key in self._statusKeywords: self._statusKeywords[key].append(val) else: logger.warning('keyword "{}" cannot be accepted, valid keys for the keywords are "{}"'.format(key, ','.join(list(self._statusKeywords.keys()))))
[docs] def extractInformation(self): """ extract information Args: None Returns: None """ ## health status logger.info('Start to extract health status') self.extractHealthStatus(self._matchedSents) ## Access status and output to an ordered csv file entList = [] aliasList = [] entTextList = [] statusList = [] cjList = [] negList = [] negTextList = [] for sent in self._matchedSents: ents = self.getCustomEnts(sent.ents, self._entityLabels[self._entID]) for ent in ents: if ent._.status is not None: entList.append(ent.text) aliasList.append(ent._.alias) if ent._.alias is not None: entTextList.append(ent._.alias) else: entTextList.append(ent.text) statusList.append(ent._.status) cjList.append(ent._.conjecture) negList.append(ent._.neg) negTextList.append(ent._.neg_text) # Extracted information can be treated as attributes for given entity dfStatus = pd.DataFrame({'entity':entList, 'alias':aliasList, 'entity_text': entTextList, 'status':statusList, 'conjecture':cjList, 'negation':negList, 'negation_text': negTextList}) dfStatus.to_csv(nlpConfig['files']['output_status_file'], columns=['entity', 'alias', 'entity_text', 'status', 'conjecture', 'negation', 'negation_text']) self._entStatus = dfStatus logger.info('End of health status extraction!') # Extract entity relations logger.info('Start to extract causal relation using OPM model information') self.extractRelDep(self._matchedSents) dfRels = pd.DataFrame(self._allRelPairs, columns=self._relationNames) dfRels.to_csv(nlpConfig['files']['output_relation_file'], columns=self._relationNames) logger.info('End of causal relation extraction!')
[docs] def extractHealthStatus(self, matchedSents, predSynonyms=[], exclPrepos=[]): """ Extract health status and relation Args: matchedSents: list, the matched sentences predSynonyms: list, predicate synonyms exclPrepos: list, exclude the prepositions """ subjList = ['nsubj', 'nsubjpass', 'nsubj:pass'] objList = ['pobj', 'dobj', 'iobj', 'obj', 'obl', 'oprd'] # procedure to process CWS data # collect status, negation, conjecture information for sent in matchedSents: ents = self.getCustomEnts(sent.ents, self._entityLabels[self._entID]) root = sent.root neg, negText = self.isNegation(root) conjecture = self.isConjecture(root) if ents is None: continue for ent in ents: ent._.set('neg', neg) ent._.set('neg_text', negText) ent._.set('conjecture', conjecture) if ent._.alias is not None: # entity at the beginning of sentence if ent.start == sent.start: status = sent[ent.end:] # some clean up for the text text = self._textProcess(status.text) ent._.set('status', text) # entity at the end of sentence elif ent.end == sent.end or (ent.end == sent.end - 1 and sent[-1].is_punct): text = sent.text # substitute entity ID with its alias text = re.sub(r"\b%s\b" % str(ent.text) , ent._.alias, text) text = self._textProcess(text) ent._.set('status', text) # entity in the middle of sentence else: entRoot = ent.root # Only include Pred and Obj info if entRoot.dep_ in subjList: status = sent[ent.end:] # some clean up for the text text = self._textProcess(status.text) ent._.set('status', text) # Include the whole info with alias substitution elif entRoot.dep_ in objList: text = sent.text # substitute entity ID with its alias text = re.sub(r"\b%s\b" % str(ent.text) , ent._.alias, text) text = getOnlyWords(text) text = self._textProcess(text) ent._.set('status', text) # other type of entities else: entRoot = ent.root if entRoot.dep_ in subjList: # depend on the application, can use self.getHealthStatusForSubj to get the status status = sent[ent.end:] # some clean up for the text text = self._textProcess(status.text) ent._.set('status', text) # Include the whole info with alias substitution elif entRoot.dep_ in objList: # depend on the application, can use self.getHealthStatusForObj to get the status text = sent.text text = getOnlyWords(text) text = self._textProcess(text) ent._.set('status', text) else: # if the entity not among subj and obj, it may not need to report it pass
[docs] def extractRelDep(self, matchedSents): """ Args: matchedSents: list, the list of matched sentences Returns: (subject tuple, predicate, object tuple): generator, the extracted causal relation """ subjList = ['nsubj', 'nsubjpass', 'nsubj:pass'] # objList = ['pobj', 'dobj', 'iobj', 'obj', 'obl', 'oprd'] for sent in matchedSents: ents = self.getCustomEnts(sent.ents, self._entityLabels[self._entID]) if len(ents) <= 1: continue root = sent.root allRelPairs = [] subjEnt = [] subjConjEnt = [] objEnt = [] objConjEnt = [] for ent in ents: entRoot = ent.root if ent._.alias is not None: text = ent._.alias else: text = ent.text # entity at the beginning of sentence if ent.start == sent.start: subjEnt.append(text) elif entRoot.dep_ in ['conj'] and entRoot.i < root.i: subjConjEnt.append(text) elif entRoot.dep_ in subjList: subjEnt.append(text) elif entRoot.dep_ in ['obj', 'dobj']: objEnt.append(text) elif entRoot.i > root.i and entRoot.dep_ in ['conj']: objConjEnt.append(text) # subj for subj in subjEnt: for subjConj in subjConjEnt: allRelPairs.append([subj, 'conj', subjConj]) for obj in objEnt: allRelPairs.append([subj, root, obj]) for objConj in objConjEnt: allRelPairs.append([subj, root, objConj]) # subjconj for subjConj in subjConjEnt: for obj in objEnt: allRelPairs.append([subjConj, root, obj]) for objConj in objConjEnt: allRelPairs.append([subjConj, root, objConj]) # obj for obj in objEnt: for objConj in objConjEnt: allRelPairs.append([obj, 'conj', objConj]) self._allRelPairs += allRelPairs