Source code for src.dackar.causal.CausalPhrase
# Copyright 2024, Battelle Energy Alliance, LLC  ALL RIGHTS RESERVED
"""
Created on March, 2024
@author: wangc, mandd
"""
import logging
import pandas as pd
import re
from spacy.tokens import Token
from spacy.tokens import Span
from ..text_processing.Preprocessing import Preprocessing
from ..utils.utils import getOnlyWords, getShortAcronym
from ..config import nlpConfig
from .CausalBase import CausalBase
if not Span.has_extension('conjecture'):
  Span.set_extension('conjecture', default=False)
if not Span.has_extension('status'):
  Span.set_extension("status", default=None)
if not Span.has_extension('neg'):
  Span.set_extension("neg", default=None)
if not Span.has_extension('neg_text'):
  Span.set_extension("neg_text", default=None)
if not Span.has_extension('alias'):
  Span.set_extension("alias", default=None)
if not Token.has_extension('conjecture'):
  Token.set_extension('conjecture', default=False)
if not Token.has_extension('status'):
  Token.set_extension("status", default=None)
if not Token.has_extension('neg'):
  Token.set_extension("neg", default=None)
if not Token.has_extension('neg_text'):
  Token.set_extension("neg_text", default=None)
if not Token.has_extension('alias'):
  Token.set_extension("alias", default=None)
[docs]
class CausalPhrase(CausalBase):
  """
    Class to process short phrase dataset
  """
  def __init__(self, nlp, entID='SSC', causalKeywordID='causal',*args, **kwargs):
    """
      Construct
      Args:
        nlp: spacy.Language object, contains all components and data needed to process text
        args: list, positional arguments
        kwargs: dict, keyword arguments
      Returns:
        None
    """
    super().__init__(nlp, entID, causalKeywordID='causal', *args, **kwargs)
[docs]
  def addKeywords(self, keywords, ktype):
    """
      Method to update self._causalKeywords or self._statusKeywords
      Args:
        keywords: dict, keywords that will be add to self._causalKeywords or self._statusKeywords
        ktype: string, either 'status' or 'causal'
    """
    if type(keywords) != dict:
      raise IOError('"addCausalKeywords" method can only accept dictionary, but got {}'.format(type(keywords)))
    if ktype.lower() == 'status':
      for key, val in keywords.items():
        if type(val) != list:
          val = [val]
        val = self.extractLemma(val)
        if key in self._statusKeywords:
          self._statusKeywords[key].append(val)
        else:
          logger.warning('keyword "{}" cannot be accepted, valid keys for the keywords are "{}"'.format(key, ','.join(list(self._statusKeywords.keys()))))
[docs]
  def extractInformation(self):
    """
      extract information
      Args:
        None
      Returns:
        None
    """
    ## health status
    logger.info('Start to extract health status')
    self.extractHealthStatus(self._matchedSents)
    rows = []
    for sent in self._matchedSents:
      ents = self.getCustomEnts(sent.ents, self._entityLabels[self._entID])
      for ent in ents:
        if ent._.status is not None:
          row = {'entity':ent.text,'label': ent.label_, 'alias':ent._.alias, 'status':ent._.status, 'conjecture':ent._.conjecture, 'negation':ent._.neg, 'negation_text': ent._.neg_text}
          rows.append(row)
    self._entStatus = pd.DataFrame(rows)
    if 'output_status_file' in nlpConfig['files']:
      self._entStatus.to_csv(nlpConfig['files']['output_status_file'], columns=['entity','label', 'alias', 'status', 'conjecture', 'negation', 'negation_text'])
    # self._entStatus = dfStatus
    logger.info('End of health status extraction!')
    # Extract entity relations
    logger.info('Start to extract general entity relation')
    self.extractRelDep(self._matchedSents)
    self._relationGeneral = pd.DataFrame(self._allRelPairs, columns=self._relationNames)
    if 'output_relation_file' in nlpConfig['files']:
      self._relationGeneral.to_csv(nlpConfig['files']['output_relation_file'], columns=self._relationNames)
    logger.info('End of general entity relation extraction!')
[docs]
  def extractHealthStatus(self, matchedSents, predSynonyms=[], exclPrepos=[]):
    """
      Extract health status and relation
      Args:
        matchedSents: list, the matched sentences
        predSynonyms: list, predicate synonyms
        exclPrepos: list, exclude the prepositions
    """
    subjList = ['nsubj', 'nsubjpass', 'nsubj:pass']
    objList = ['pobj', 'dobj', 'iobj', 'obj', 'obl', 'oprd']
    # procedure to process CWS data
    # collect status, negation, conjecture information
    for sent in matchedSents:
      ents = self.getCustomEnts(sent.ents, self._entityLabels[self._entID])
      root = sent.root
      neg, negText = self.isNegation(root)
      conjecture = self.isConjecture(root)
      if ents is None:
        continue
      for ent in ents:
        ent._.set('neg', neg)
        ent._.set('neg_text', negText)
        ent._.set('conjecture', conjecture)
        if ent._.alias is not None:
          # entity at the beginning of sentence
          if ent.start == sent.start:
            status = sent[ent.end:]
            # some clean up for the text
            text = self._textProcess(status.text)
            ent._.set('status', text)
          # entity at the end of sentence
          elif ent.end == sent.end or (ent.end == sent.end - 1 and sent[-1].is_punct):
            text = sent.text
            # substitute entity ID with its alias
            text = re.sub(r"\b%s\b" % str(ent.text) , ent._.alias, text)
            text = self._textProcess(text)
            ent._.set('status', text)
          # entity in the middle of sentence
          else:
            entRoot = ent.root
            # Only include Pred and Obj info
            if entRoot.dep_ in subjList:
              status = sent[ent.end:]
              # some clean up for the text
              text = self._textProcess(status.text)
              ent._.set('status', text)
            # Include the whole info with alias substitution
            elif entRoot.dep_ in objList:
              text = sent.text
              # substitute entity ID with its alias
              text = re.sub(r"\b%s\b" % str(ent.text) , ent._.alias, text)
              text = getOnlyWords(text)
              text = self._textProcess(text)
              ent._.set('status', text)
        # other type of entities
        else:
          entRoot = ent.root
          if entRoot.dep_ in subjList:
            # depend on the application, can use self.getHealthStatusForSubj to get the status
            status = sent[ent.end:]
            # some clean up for the text
            text = self._textProcess(status.text)
            ent._.set('status', text)
          # Include the whole info with alias substitution
          elif entRoot.dep_ in objList:
            # depend on the application, can use self.getHealthStatusForObj to get the status
            text = sent.text
            text = getOnlyWords(text)
            text = self._textProcess(text)
            ent._.set('status', text)
          else:
            # if the entity not among subj and obj, it may not need to report it
            pass