Initial commit for comprehend moderator (#9665)

This PR implements a custom chain that wraps Amazon Comprehend API
calls. The custom chain is aimed to be used with LLM chains to provide
moderation capability that let’s you detect and redact PII, Toxic and
Intent content in the LLM prompt, or the LLM response. The
implementation accepts a configuration object to control what checks
will be performed on a LLM prompt and can be used in a variety of setups
using the LangChain expression language to not only detect the
configured info in chains, but also other constructs such as a
retriever.
The included sample notebook goes over the different configuration
options and how to use it with other chains.

###  Usage sample
```python
from langchain_experimental.comprehend_moderation import BaseModerationActions, BaseModerationFilters

moderation_config = { 
        "filters":[ 
                BaseModerationFilters.PII, 
                BaseModerationFilters.TOXICITY,
                BaseModerationFilters.INTENT
        ],
        "pii":{ 
                "action": BaseModerationActions.ALLOW, 
                "threshold":0.5, 
                "labels":["SSN"],
                "mask_character": "X"
        },
        "toxicity":{ 
                "action": BaseModerationActions.STOP, 
                "threshold":0.5
        },
        "intent":{ 
                "action": BaseModerationActions.STOP, 
                "threshold":0.5
        }
}

comp_moderation_with_config = AmazonComprehendModerationChain(
    moderation_config=moderation_config, #specify the configuration
    client=comprehend_client,            #optionally pass the Boto3 Client
    verbose=True
)

template = """Question: {question}

Answer:"""

prompt = PromptTemplate(template=template, input_variables=["question"])

responses = [
    "Final Answer: A credit card number looks like 1289-2321-1123-2387. A fake SSN number looks like 323-22-9980. John Doe's phone number is (999)253-9876.", 
    "Final Answer: This is a really shitty way of constructing a birdhouse. This is fucking insane to think that any birds would actually create their motherfucking nests here."
]
llm = FakeListLLM(responses=responses)

llm_chain = LLMChain(prompt=prompt, llm=llm)

chain = ( 
    prompt 
    | comp_moderation_with_config 
    | {llm_chain.input_keys[0]: lambda x: x['output'] }  
    | llm_chain 
    | { "input": lambda x: x['text'] } 
    | comp_moderation_with_config 
)

response = chain.invoke({"question": "A sample SSN number looks like this 123-456-7890. Can you give me some more samples?"})

print(response['output'])


```
### Output
```
> Entering new AmazonComprehendModerationChain chain...
Running AmazonComprehendModerationChain...
Running pii validation...
Found PII content..stopping..
The prompt contains PII entities and cannot be processed
```

---------

Co-authored-by: Piyush Jain <piyushjain@duck.com>
Co-authored-by: Anjan Biswas <anjanavb@amazon.com>
Co-authored-by: Jha <nikjha@amazon.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
pull/9741/head
nikhilkjha 1 year ago committed by GitHub
parent 4339d21cf1
commit d57d08fd01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,25 @@
from langchain_experimental.comprehend_moderation.amazon_comprehend_moderation import (
AmazonComprehendModerationChain,
)
from langchain_experimental.comprehend_moderation.base_moderation import BaseModeration
from langchain_experimental.comprehend_moderation.base_moderation_callbacks import (
BaseModerationCallbackHandler,
)
from langchain_experimental.comprehend_moderation.base_moderation_enums import (
BaseModerationActions,
BaseModerationFilters,
)
from langchain_experimental.comprehend_moderation.intent import ComprehendIntent
from langchain_experimental.comprehend_moderation.pii import ComprehendPII
from langchain_experimental.comprehend_moderation.toxicity import ComprehendToxicity
__all__ = [
"BaseModeration",
"BaseModerationActions",
"BaseModerationFilters",
"ComprehendPII",
"ComprehendIntent",
"ComprehendToxicity",
"BaseModerationCallbackHandler",
"AmazonComprehendModerationChain",
]

@ -0,0 +1,184 @@
from typing import Any, Dict, List, Optional
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain.chains.base import Chain
from langchain_experimental.comprehend_moderation.base_moderation import (
BaseModeration,
)
from langchain_experimental.comprehend_moderation.base_moderation_callbacks import (
BaseModerationCallbackHandler,
)
from langchain_experimental.pydantic_v1 import root_validator
class AmazonComprehendModerationChain(Chain):
"""A subclass of Chain, designed to apply moderation to LLMs."""
output_key: str = "output" #: :meta private:
"""Key used to fetch/store the output in data containers. Defaults to `output`"""
input_key: str = "input" #: :meta private:
"""Key used to fetch/store the input in data containers. Defaults to `input`"""
moderation_config: Optional[Dict[str, Any]] = None
"""Configuration settings for moderation"""
client: Optional[Any]
"""boto3 client object for connection to Amazon Comprehend"""
region_name: Optional[str] = None
"""The aws region e.g., `us-west-2`. Fallsback to AWS_DEFAULT_REGION env variable
or region specified in ~/.aws/config in case it is not provided here.
"""
credentials_profile_name: Optional[str] = None
"""The name of the profile in the ~/.aws/credentials or ~/.aws/config files, which
has either access keys or role information specified.
If not specified, the default credential profile or, if on an EC2 instance,
credentials from IMDS will be used.
See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
"""
moderation_callback: Optional[BaseModerationCallbackHandler] = None
"""Callback handler for moderation, this is different
from regular callbacks which can be used in addition to this."""
unique_id: Optional[str] = None
"""A unique id that can be used to identify or group a user or session"""
@root_validator(pre=True)
def create_client(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""
Creates an Amazon Comprehend client
Args:
values (Dict[str, Any]): A dictionary containing configuration values.
Returns:
Dict[str, Any]: A dictionary with the updated configuration values,
including the Amazon Comprehend client.
Raises:
ModuleNotFoundError: If the 'boto3' package is not installed.
ValueError: If there is an issue importing 'boto3' or loading
AWS credentials.
Example:
.. code-block:: python
config = {
"credentials_profile_name": "my-profile",
"region_name": "us-west-2"
}
updated_config = create_client(config)
comprehend_client = updated_config["client"]
"""
if values.get("client") is not None:
return values
try:
import boto3
if values.get("credentials_profile_name"):
session = boto3.Session(profile_name=values["credentials_profile_name"])
else:
# use default credentials
session = boto3.Session()
client_params = {}
if values.get("region_name"):
client_params["region_name"] = values["region_name"]
values["client"] = session.client("comprehend", **client_params)
return values
except ImportError:
raise ModuleNotFoundError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
except Exception as e:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
"profile name are valid."
) from e
@property
def output_keys(self) -> List[str]:
"""
Returns a list of output keys.
This method defines the output keys that will be used to access the output
values produced by the chain or function. It ensures that the specified keys
are available to access the outputs.
Returns:
List[str]: A list of output keys.
Note:
This method is considered private and may not be intended for direct
external use.
"""
return [self.output_key]
@property
def input_keys(self) -> List[str]:
"""
Returns a list of input keys expected by the prompt.
This method defines the input keys that the prompt expects in order to perform
its processing. It ensures that the specified keys are available for providing
input to the prompt.
Returns:
List[str]: A list of input keys.
Note:
This method is considered private and may not be intended for direct
external use.
"""
return [self.input_key]
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, str]:
"""
Executes the moderation process on the input text and returns the processed
output.
This internal method performs the moderation process on the input text. It
converts the input prompt value to plain text, applies the specified filters,
and then converts the filtered output back to a suitable prompt value object.
Additionally, it provides the option to log information about the run using
the provided `run_manager`.
Args:
inputs: A dictionary containing input values
run_manager: A run manager to handle run-related events. Default is None
Returns:
Dict[str, str]: A dictionary containing the processed output of the
moderation process.
Raises:
ValueError: If there is an error during the moderation process
"""
if run_manager:
run_manager.on_text("Running AmazonComprehendModerationChain...\n")
moderation = BaseModeration(
client=self.client,
config=self.moderation_config,
moderation_callback=self.moderation_callback,
unique_id=self.unique_id,
run_manager=run_manager,
)
response = moderation.moderate(prompt=inputs[self.input_keys[0]])
return {self.output_key: response}

@ -0,0 +1,176 @@
import uuid
from typing import Any, Callable, Dict, Optional
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain.prompts.base import StringPromptValue
from langchain.prompts.chat import ChatPromptValue
from langchain.schema import AIMessage, HumanMessage
from langchain_experimental.comprehend_moderation.intent import ComprehendIntent
from langchain_experimental.comprehend_moderation.pii import ComprehendPII
from langchain_experimental.comprehend_moderation.toxicity import ComprehendToxicity
class BaseModeration:
def __init__(
self,
client: Any,
config: Optional[Dict[str, Any]] = None,
moderation_callback: Optional[Any] = None,
unique_id: Optional[str] = None,
run_manager: Optional[CallbackManagerForChainRun] = None,
):
self.client = client
self.config = config
self.moderation_callback = moderation_callback
self.unique_id = unique_id
self.chat_message_index = 0
self.run_manager = run_manager
self.chain_id = str(uuid.uuid4())
def _convert_prompt_to_text(self, prompt: Any) -> str:
input_text = str()
if isinstance(prompt, StringPromptValue):
input_text = prompt.text
elif isinstance(prompt, str):
input_text = prompt
elif isinstance(prompt, ChatPromptValue):
"""
We will just check the last message in the message Chain of a
ChatPromptTemplate. The typical chronology is
SystemMessage > HumanMessage > AIMessage and so on. However assuming
that with every chat the chain is invoked we will only check the last
message. This is assuming that all previous messages have been checked
already. Only HumanMessage and AIMessage will be checked. We can perhaps
loop through and take advantage of the additional_kwargs property in the
HumanMessage and AIMessage schema to mark messages that have been moderated.
However that means that this class could generate multiple text chunks
and moderate() logics would need to be updated. This also means some
complexity in re-constructing the prompt while keeping the messages in
sequence.
"""
message = prompt.messages[-1]
self.chat_message_index = len(prompt.messages) - 1
if isinstance(message, HumanMessage):
input_text = message.content
if isinstance(message, AIMessage):
input_text = message.content
else:
raise ValueError(
f"Invalid input type {type(input)}. "
"Must be a PromptValue, str, or list of BaseMessages."
)
return input_text
def _convert_text_to_prompt(self, prompt: Any, text: str) -> Any:
if isinstance(prompt, StringPromptValue):
return StringPromptValue(text=text)
elif isinstance(prompt, str):
return text
elif isinstance(prompt, ChatPromptValue):
messages = prompt.messages
message = messages[self.chat_message_index]
if isinstance(message, HumanMessage):
messages[self.chat_message_index] = HumanMessage(
content=text,
example=message.example,
additional_kwargs=message.additional_kwargs,
)
if isinstance(message, AIMessage):
messages[self.chat_message_index] = AIMessage(
content=text,
example=message.example,
additional_kwargs=message.additional_kwargs,
)
return ChatPromptValue(messages=messages)
else:
raise ValueError(
f"Invalid input type {type(input)}. "
"Must be a PromptValue, str, or list of BaseMessages."
)
def _moderation_class(self, moderation_class: Any) -> Callable:
return moderation_class(
client=self.client,
callback=self.moderation_callback,
unique_id=self.unique_id,
chain_id=self.chain_id,
).validate
def _log_message_for_verbose(self, message: str) -> None:
if self.run_manager:
self.run_manager.on_text(message)
def moderate(self, prompt: Any) -> str:
from langchain_experimental.comprehend_moderation.base_moderation_exceptions import ( # noqa: E501
ModerationIntentionError,
ModerationPiiError,
ModerationToxicityError,
)
try:
# convert prompt to text
input_text = self._convert_prompt_to_text(prompt=prompt)
output_text = str()
# perform moderation
if self.config is None:
# In absence of config Action will default to STOP only
self._log_message_for_verbose("Running pii validation...\n")
pii_validate = self._moderation_class(moderation_class=ComprehendPII)
output_text = pii_validate(prompt_value=input_text)
self._log_message_for_verbose("Running toxicity validation...\n")
toxicity_validate = self._moderation_class(
moderation_class=ComprehendToxicity
)
output_text = toxicity_validate(prompt_value=output_text)
self._log_message_for_verbose("Running intent validation...\n")
intent_validate = self._moderation_class(
moderation_class=ComprehendIntent
)
output_text = intent_validate(prompt_value=output_text)
else:
filter_functions = {
"pii": ComprehendPII,
"toxicity": ComprehendToxicity,
"intent": ComprehendIntent,
}
filters = self.config["filters"]
for _filter in filters:
filter_name = f"{_filter}"
if filter_name in filter_functions:
self._log_message_for_verbose(
f"Running {filter_name} Validation...\n"
)
validation_fn = self._moderation_class(
moderation_class=filter_functions[filter_name]
)
input_text = input_text if not output_text else output_text
output_text = validation_fn(
prompt_value=input_text,
config=self.config[filter_name]
if filter_name in self.config
else None,
)
# convert text to prompt and return
return self._convert_text_to_prompt(prompt=prompt, text=output_text)
except ModerationPiiError as e:
self._log_message_for_verbose(f"Found PII content..stopping..\n{str(e)}\n")
raise e
except ModerationToxicityError as e:
self._log_message_for_verbose(
f"Found Toxic content..stopping..\n{str(e)}\n"
)
raise e
except ModerationIntentionError as e:
self._log_message_for_verbose(
f"Found Harmful intention..stopping..\n{str(e)}\n"
)
raise e
except Exception as e:
raise e

@ -0,0 +1,64 @@
from typing import Any, Callable, Dict
class BaseModerationCallbackHandler:
def __init__(self) -> None:
if (
self._is_method_unchanged(
BaseModerationCallbackHandler.on_after_pii, self.on_after_pii
)
and self._is_method_unchanged(
BaseModerationCallbackHandler.on_after_toxicity, self.on_after_toxicity
)
and self._is_method_unchanged(
BaseModerationCallbackHandler.on_after_intent, self.on_after_intent
)
):
raise NotImplementedError(
"Subclasses must override at least one of on_after_pii(), "
"on_after_toxicity(), or on_after_intent() functions."
)
def _is_method_unchanged(
self, base_method: Callable, derived_method: Callable
) -> bool:
return base_method.__qualname__ == derived_method.__qualname__
async def on_after_pii(
self, moderation_beacon: Dict[str, Any], unique_id: str, **kwargs: Any
) -> None:
"""Run after PII validation is complete."""
raise NotImplementedError("Subclasses should implement this async method.")
async def on_after_toxicity(
self, moderation_beacon: Dict[str, Any], unique_id: str, **kwargs: Any
) -> None:
"""Run after Toxicity validation is complete."""
raise NotImplementedError("Subclasses should implement this async method.")
async def on_after_intent(
self, moderation_beacon: Dict[str, Any], unique_id: str, **kwargs: Any
) -> None:
"""Run after Toxicity validation is complete."""
raise NotImplementedError("Subclasses should implement this async method.")
@property
def pii_callback(self) -> bool:
return (
self.on_after_pii.__func__ # type: ignore
is not BaseModerationCallbackHandler.on_after_pii
)
@property
def toxicity_callback(self) -> bool:
return (
self.on_after_toxicity.__func__ # type: ignore
is not BaseModerationCallbackHandler.on_after_toxicity
)
@property
def intent_callback(self) -> bool:
return (
self.on_after_intent.__func__ # type: ignore
is not BaseModerationCallbackHandler.on_after_intent
)

@ -0,0 +1,12 @@
from enum import Enum
class BaseModerationActions(Enum):
STOP = 1
ALLOW = 2
class BaseModerationFilters(str, Enum):
PII = "pii"
TOXICITY = "toxicity"
INTENT = "intent"

@ -0,0 +1,43 @@
class ModerationPiiError(Exception):
"""Exception raised if PII entities are detected.
Attributes:
message -- explanation of the error
"""
def __init__(
self, message: str = "The prompt contains PII entities and cannot be processed"
):
self.message = message
super().__init__(self.message)
class ModerationToxicityError(Exception):
"""Exception raised if Toxic entities are detected.
Attributes:
message -- explanation of the error
"""
def __init__(
self, message: str = "The prompt contains toxic content and cannot be processed"
):
self.message = message
super().__init__(self.message)
class ModerationIntentionError(Exception):
"""Exception raised if Intention entities are detected.
Attributes:
message -- explanation of the error
"""
def __init__(
self,
message: str = (
"The prompt indicates an un-desired intent and " "cannot be processed"
),
):
self.message = message
super().__init__(self.message)

@ -0,0 +1,101 @@
import asyncio
import warnings
from typing import Any, Dict, Optional
from langchain_experimental.comprehend_moderation.base_moderation_exceptions import (
ModerationIntentionError,
)
class ComprehendIntent:
def __init__(
self,
client: Any,
callback: Optional[Any] = None,
unique_id: Optional[str] = None,
chain_id: Optional[str] = None,
) -> None:
self.client = client
self.moderation_beacon = {
"moderation_chain_id": chain_id,
"moderation_type": "Intent",
"moderation_status": "LABELS_NOT_FOUND",
}
self.callback = callback
self.unique_id = unique_id
def _get_arn(self) -> str:
region_name = self.client.meta.region_name
service = "comprehend"
intent_endpoint = "document-classifier-endpoint/prompt-intent"
return f"arn:aws:{service}:{region_name}:aws:{intent_endpoint}"
def validate(
self, prompt_value: str, config: Optional[Dict[str, Any]] = None
) -> str:
"""
Check and validate the intent of the given prompt text.
Args:
comprehend_client: Comprehend client for intent classification
prompt_value (str): The input text to be checked for unintended intent
config (Dict[str, Any]): Configuration settings for intent checks
Raises:
ValueError: If unintended intent is found in the prompt text based
on the specified threshold.
Returns:
str: The input prompt_value.
Note:
This function checks the intent of the provided prompt text using
Comprehend's classify_document API and raises an error if unintended
intent is detected with a score above the specified threshold.
"""
from langchain_experimental.comprehend_moderation.base_moderation_enums import (
BaseModerationActions,
)
threshold = config.get("threshold", 0.5) if config else 0.5
action = (
config.get("action", BaseModerationActions.STOP)
if config
else BaseModerationActions.STOP
)
intent_found = False
if action == BaseModerationActions.ALLOW:
warnings.warn(
"You have allowed content with Harmful content."
"Defaulting to STOP action..."
)
action = BaseModerationActions.STOP
endpoint_arn = self._get_arn()
response = self.client.classify_document(
Text=prompt_value, EndpointArn=endpoint_arn
)
if self.callback and self.callback.intent_callback:
self.moderation_beacon["moderation_input"] = prompt_value
self.moderation_beacon["moderation_output"] = response
for class_result in response["Classes"]:
if (
class_result["Score"] >= threshold
and class_result["Name"] == "UNDESIRED_PROMPT"
):
intent_found = True
break
if self.callback and self.callback.intent_callback:
if intent_found:
self.moderation_beacon["moderation_status"] = "LABELS_FOUND"
asyncio.create_task(
self.callback.on_after_intent(self.moderation_beacon, self.unique_id)
)
if intent_found:
raise ModerationIntentionError
return prompt_value

@ -0,0 +1,173 @@
import asyncio
from typing import Any, Dict, Optional
from langchain_experimental.comprehend_moderation.base_moderation_exceptions import (
ModerationPiiError,
)
class ComprehendPII:
def __init__(
self,
client: Any,
callback: Optional[Any] = None,
unique_id: Optional[str] = None,
chain_id: Optional[str] = None,
) -> None:
self.client = client
self.moderation_beacon = {
"moderation_chain_id": chain_id,
"moderation_type": "PII",
"moderation_status": "LABELS_NOT_FOUND",
}
self.callback = callback
self.unique_id = unique_id
def validate(
self, prompt_value: str, config: Optional[Dict[str, Any]] = None
) -> str:
from langchain_experimental.comprehend_moderation.base_moderation_enums import (
BaseModerationActions,
)
if config:
action = config.get("action", BaseModerationActions.STOP)
if action not in [BaseModerationActions.STOP, BaseModerationActions.ALLOW]:
raise ValueError("Action can either be stop or allow")
return (
self._contains_pii(prompt_value=prompt_value, config=config)
if action == BaseModerationActions.STOP
else self._detect_pii(prompt_value=prompt_value, config=config)
)
else:
return self._contains_pii(prompt_value=prompt_value)
def _contains_pii(
self, prompt_value: str, config: Optional[Dict[str, Any]] = None
) -> str:
"""
Checks for Personally Identifiable Information (PII) labels above a
specified threshold.
Args:
prompt_value (str): The input text to be checked for PII labels.
config (Dict[str, Any]): Configuration for PII check and actions.
Returns:
str: the original prompt
Note:
- The provided client should be initialized with valid AWS credentials.
"""
pii_identified = self.client.contains_pii_entities(
Text=prompt_value, LanguageCode="en"
)
if self.callback and self.callback.pii_callback:
self.moderation_beacon["moderation_input"] = prompt_value
self.moderation_beacon["moderation_output"] = pii_identified
threshold = config.get("threshold", 0.5) if config else 0.5
pii_labels = config.get("labels", []) if config else []
pii_found = False
for entity in pii_identified["Labels"]:
if (entity["Score"] >= threshold and entity["Name"] in pii_labels) or (
entity["Score"] >= threshold and not pii_labels
):
pii_found = True
break
if self.callback and self.callback.pii_callback:
if pii_found:
self.moderation_beacon["moderation_status"] = "LABELS_FOUND"
asyncio.create_task(
self.callback.on_after_pii(self.moderation_beacon, self.unique_id)
)
if pii_found:
raise ModerationPiiError
return prompt_value
def _detect_pii(self, prompt_value: str, config: Optional[Dict[str, Any]]) -> str:
"""
Detects and handles Personally Identifiable Information (PII) entities in the
given prompt text using Amazon Comprehend's detect_pii_entities API. The
function provides options to redact or stop processing based on the identified
PII entities and a provided configuration.
Args:
prompt_value (str): The input text to be checked for PII entities.
config (Dict[str, Any]): A configuration specifying how to handle
PII entities.
Returns:
str: The processed prompt text with redacted PII entities or raised
exceptions.
Raises:
ValueError: If the prompt contains configured PII entities for
stopping processing.
Note:
- If PII is not found in the prompt, the original prompt is returned.
- The client should be initialized with valid AWS credentials.
"""
pii_identified = self.client.detect_pii_entities(
Text=prompt_value, LanguageCode="en"
)
if self.callback and self.callback.pii_callback:
self.moderation_beacon["moderation_input"] = prompt_value
self.moderation_beacon["moderation_output"] = pii_identified
if (pii_identified["Entities"]) == []:
if self.callback and self.callback.pii_callback:
asyncio.create_task(
self.callback.on_after_pii(self.moderation_beacon, self.unique_id)
)
return prompt_value
pii_found = False
if not config and pii_identified["Entities"]:
for entity in pii_identified["Entities"]:
if entity["Score"] >= 0.5:
pii_found = True
break
if self.callback and self.callback.pii_callback:
if pii_found:
self.moderation_beacon["moderation_status"] = "LABELS_FOUND"
asyncio.create_task(
self.callback.on_after_pii(self.moderation_beacon, self.unique_id)
)
if pii_found:
raise ModerationPiiError
else:
threshold = config.get("threshold", 0.5) # type: ignore
pii_labels = config.get("labels", []) # type: ignore
mask_marker = config.get("mask_character", "*") # type: ignore
pii_found = False
for entity in pii_identified["Entities"]:
if (
pii_labels
and entity["Type"] in pii_labels
and entity["Score"] >= threshold
) or (not pii_labels and entity["Score"] >= threshold):
pii_found = True
char_offset_begin = entity["BeginOffset"]
char_offset_end = entity["EndOffset"]
prompt_value = (
prompt_value[:char_offset_begin]
+ mask_marker * (char_offset_end - char_offset_begin)
+ prompt_value[char_offset_end:]
)
if self.callback and self.callback.pii_callback:
if pii_found:
self.moderation_beacon["moderation_status"] = "LABELS_FOUND"
asyncio.create_task(
self.callback.on_after_pii(self.moderation_beacon, self.unique_id)
)
return prompt_value

@ -0,0 +1,209 @@
import asyncio
import importlib
import warnings
from typing import Any, Dict, List, Optional
from langchain_experimental.comprehend_moderation.base_moderation_exceptions import (
ModerationToxicityError,
)
class ComprehendToxicity:
def __init__(
self,
client: Any,
callback: Optional[Any] = None,
unique_id: Optional[str] = None,
chain_id: Optional[str] = None,
) -> None:
self.client = client
self.moderation_beacon = {
"moderation_chain_id": chain_id,
"moderation_type": "Toxicity",
"moderation_status": "LABELS_NOT_FOUND",
}
self.callback = callback
self.unique_id = unique_id
def _toxicity_init_validate(self, max_size: int) -> Any:
"""
Validate and initialize toxicity processing configuration.
Args:
max_size (int): Maximum sentence size defined in the configuration object.
Raises:
Exception: If the maximum sentence size exceeds the 5KB limit.
Note:
This function ensures that the NLTK punkt tokenizer is downloaded if not
already present.
Returns:
None
"""
if max_size > 1024 * 5:
raise Exception("The sentence length should not exceed 5KB.")
try:
nltk = importlib.import_module("nltk")
nltk.data.find("tokenizers/punkt")
return nltk
except ImportError:
raise ModuleNotFoundError(
"Could not import nltk python package. "
"Please install it with `pip install nltk`."
)
except LookupError:
nltk.download("punkt")
def _split_paragraph(
self, prompt_value: str, max_size: int = 1024 * 4
) -> List[List[str]]:
"""
Split a paragraph into chunks of sentences, respecting the maximum size limit.
Args:
paragraph (str): The input paragraph to be split into chunks
max_size (int, optional): The maximum size limit in bytes for each chunk
Defaults to 1024.
Returns:
List[List[str]]: A list of chunks, where each chunk is a list of sentences
Note:
This function validates the maximum sentence size based on service limits
using the 'toxicity_init_validate' function. It uses the NLTK sentence
tokenizer to split the paragraph into sentences.
"""
# validate max. sentence size based on Service limits
nltk = self._toxicity_init_validate(max_size)
sentences = nltk.sent_tokenize(prompt_value)
chunks = []
current_chunk = [] # type: ignore
current_size = 0
for sentence in sentences:
sentence_size = len(sentence.encode("utf-8"))
# If adding a new sentence exceeds max_size or
# current_chunk has 10 sentences, start a new chunk
if (current_size + sentence_size > max_size) or (len(current_chunk) >= 10):
if current_chunk: # Avoid appending empty chunks
chunks.append(current_chunk)
current_chunk = []
current_size = 0
current_chunk.append(sentence)
current_size += sentence_size
# Add any remaining sentences
if current_chunk:
chunks.append(current_chunk)
return chunks
def validate(
self, prompt_value: str, config: Optional[Dict[str, Any]] = None
) -> str:
"""
Check the toxicity of a given text prompt using AWS Comprehend service
and apply actions based on configuration.
Args:
prompt_value (str): The text content to be checked for toxicity.
config (Dict[str, Any]): Configuration for toxicity checks and actions.
Returns:
str: The original prompt_value if allowed or no toxicity found.
Raises:
ValueError: If the prompt contains toxic labels and cannot be
processed based on the configuration.
"""
chunks = self._split_paragraph(prompt_value=prompt_value)
for sentence_list in chunks:
segments = [{"Text": sentence} for sentence in sentence_list]
response = self.client.detect_toxic_content(
TextSegments=segments, LanguageCode="en"
)
if self.callback and self.callback.toxicity_callback:
self.moderation_beacon["moderation_input"] = segments # type: ignore
self.moderation_beacon["moderation_output"] = response
if config:
from langchain_experimental.comprehend_moderation.base_moderation_enums import ( # noqa: E501
BaseModerationActions,
)
toxicity_found = False
action = config.get("action", BaseModerationActions.STOP)
if action not in [
BaseModerationActions.STOP,
BaseModerationActions.ALLOW,
]:
raise ValueError("Action can either be stop or allow")
threshold = config.get("threshold", 0.5) if config else 0.5
toxicity_labels = config.get("labels", []) if config else []
if action == BaseModerationActions.STOP:
for item in response["ResultList"]:
for label in item["Labels"]:
if (
label
and (
not toxicity_labels
or label["Name"] in toxicity_labels
)
and label["Score"] >= threshold
):
toxicity_found = True
break
if action == BaseModerationActions.ALLOW:
if not toxicity_labels:
warnings.warn(
"You have allowed toxic content without specifying "
"any toxicity labels."
)
else:
for item in response["ResultList"]:
for label in item["Labels"]:
if (
label["Name"] in toxicity_labels
and label["Score"] >= threshold
):
toxicity_found = True
break
if self.callback and self.callback.toxicity_callback:
if toxicity_found:
self.moderation_beacon["moderation_status"] = "LABELS_FOUND"
asyncio.create_task(
self.callback.on_after_toxicity(
self.moderation_beacon, self.unique_id
)
)
if toxicity_found:
raise ModerationToxicityError
else:
if response["ResultList"]:
detected_toxic_labels = list()
for item in response["ResultList"]:
detected_toxic_labels.extend(item["Labels"])
if any(item["Score"] >= 0.5 for item in detected_toxic_labels):
if self.callback and self.callback.toxicity_callback:
self.moderation_beacon["moderation_status"] = "LABELS_FOUND"
asyncio.create_task(
self.callback.on_after_toxicity(
self.moderation_beacon, self.unique_id
)
)
raise ModerationToxicityError
return prompt_value
Loading…
Cancel
Save