mirror of
https://github.com/openai/openai-cookbook
synced 2024-11-09 19:10:56 +00:00
6b00fd6ea8
Add explanative docstring to handle_file_string function and move docstrings to below function definition as per pep 257 specification
132 lines
5.0 KiB
Python
132 lines
5.0 KiB
Python
from typing import Iterator
|
|
from numpy import array, average
|
|
import openai
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from config import TEXT_EMBEDDING_CHUNK_SIZE, EMBEDDINGS_MODEL
|
|
from database import load_vectors
|
|
|
|
def get_col_average_from_list_of_lists(list_of_lists):
|
|
"""Return the average of each column in a list of lists."""
|
|
if len(list_of_lists) == 1:
|
|
return list_of_lists[0]
|
|
else:
|
|
list_of_lists_array = array(list_of_lists)
|
|
average_embedding = average(list_of_lists_array, axis=0)
|
|
return average_embedding.tolist()
|
|
|
|
# Create embeddings for a text using a tokenizer and an OpenAI engine
|
|
|
|
|
|
def create_embeddings_for_text(text, tokenizer):
|
|
"""Return a list of tuples (text_chunk, embedding) and an average embedding for a text."""
|
|
token_chunks = list(chunks(text, TEXT_EMBEDDING_CHUNK_SIZE, tokenizer))
|
|
text_chunks = [tokenizer.decode(chunk) for chunk in token_chunks]
|
|
|
|
embeddings_response = get_embeddings(text_chunks, EMBEDDINGS_MODEL)
|
|
embeddings = [embedding["embedding"] for embedding in embeddings_response]
|
|
text_embeddings = list(zip(text_chunks, embeddings))
|
|
|
|
average_embedding = get_col_average_from_list_of_lists(embeddings)
|
|
|
|
return (text_embeddings, average_embedding)
|
|
|
|
def get_embeddings(text_array, engine):
|
|
return openai.Engine(id=engine).embeddings(input=text_array)["data"]
|
|
|
|
# Split a text into smaller chunks of size n, preferably ending at the end of a sentence
|
|
def chunks(text, n, tokenizer):
|
|
"""Yield successive n-sized chunks from text."""
|
|
tokens = tokenizer.encode(text)
|
|
i = 0
|
|
while i < len(tokens):
|
|
# Find the nearest end of sentence within a range of 0.5 * n and 1.5 * n tokens
|
|
j = min(i + int(1.5 * n), len(tokens))
|
|
while j > i + int(0.5 * n):
|
|
# Decode the tokens and check for full stop or newline
|
|
chunk = tokenizer.decode(tokens[i:j])
|
|
if chunk.endswith(".") or chunk.endswith("\n"):
|
|
break
|
|
j -= 1
|
|
# If no end of sentence found, use n tokens as the chunk size
|
|
if j == i + int(0.5 * n):
|
|
j = min(i + n, len(tokens))
|
|
yield tokens[i:j]
|
|
i = j
|
|
|
|
def get_unique_id_for_file_chunk(filename, chunk_index):
|
|
return str(filename+"-!"+str(chunk_index))
|
|
|
|
def handle_file_string(file, tokenizer, redis_conn, text_embedding_field, index_name):
|
|
"""
|
|
Handle a file string by cleaning it up, creating embeddings, and uploading them to Redis.
|
|
|
|
Args:
|
|
file (tuple): A tuple containing the filename and file body string.
|
|
tokenizer: The tokenizer object to use for encoding and decoding text.
|
|
redis_conn: The Redis connection object.
|
|
text_embedding_field (str): The field in Redis where the text embeddings will be stored.
|
|
index_name: The name of the index or identifier for the embeddings.
|
|
|
|
Returns:
|
|
None
|
|
|
|
Raises:
|
|
Exception: If there is an error creating embeddings or uploading to Redis.
|
|
|
|
"""
|
|
filename = file[0]
|
|
file_body_string = file[1]
|
|
|
|
# Clean up the file string by replacing newlines, double spaces, and semi-colons
|
|
clean_file_body_string = file_body_string.replace(" ", " ").replace("\n", "; ").replace(';', ' ')
|
|
|
|
# Add the filename to the text to embed
|
|
text_to_embed = "Filename is: {}; {}".format(filename, clean_file_body_string)
|
|
|
|
try:
|
|
# Create embeddings for the text
|
|
text_embeddings, average_embedding = create_embeddings_for_text(text_to_embed, tokenizer)
|
|
# print("[handle_file_string] Created embedding for {}".format(filename))
|
|
except Exception as e:
|
|
print("[handle_file_string] Error creating embedding: {}".format(e))
|
|
|
|
# Get the vectors array of triples: file_chunk_id, embedding, metadata for each embedding
|
|
# Metadata is a dict with keys: filename, file_chunk_index
|
|
vectors = []
|
|
for i, (text_chunk, embedding) in enumerate(text_embeddings):
|
|
id = get_unique_id_for_file_chunk(filename, i)
|
|
vectors.append({'id': id, "vector": embedding, 'metadata': {"filename": filename,
|
|
"text_chunk": text_chunk,
|
|
"file_chunk_index": i}})
|
|
|
|
try:
|
|
# Load vectors into Redis
|
|
load_vectors(redis_conn, vectors, text_embedding_field)
|
|
except Exception as e:
|
|
print(f'Ran into a problem uploading to Redis: {e}')
|
|
|
|
|
|
# Make a class to generate batches for insertion
|
|
class BatchGenerator:
|
|
|
|
|
|
def __init__(self, batch_size: int = 10) -> None:
|
|
self.batch_size = batch_size
|
|
|
|
# Makes chunks out of an input DataFrame
|
|
def to_batches(self, df: pd.DataFrame) -> Iterator[pd.DataFrame]:
|
|
splits = self.splits_num(df.shape[0])
|
|
if splits <= 1:
|
|
yield df
|
|
else:
|
|
for chunk in np.array_split(df, splits):
|
|
yield chunk
|
|
|
|
# Determines how many chunks DataFrame contains
|
|
def splits_num(self, elements: int) -> int:
|
|
return round(elements / self.batch_size)
|
|
|
|
__call__ = to_batches
|