mirror of
https://github.com/openai/openai-cookbook
synced 2024-11-08 01:10:29 +00:00
116 lines
4.4 KiB
Python
116 lines
4.4 KiB
Python
from typing import Iterator
|
|
from numpy import array, average
|
|
import openai
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from config import TEXT_EMBEDDING_CHUNK_SIZE, EMBEDDINGS_MODEL
|
|
from database import load_vectors
|
|
|
|
def get_col_average_from_list_of_lists(list_of_lists):
|
|
"""Return the average of each column in a list of lists."""
|
|
if len(list_of_lists) == 1:
|
|
return list_of_lists[0]
|
|
else:
|
|
list_of_lists_array = array(list_of_lists)
|
|
average_embedding = average(list_of_lists_array, axis=0)
|
|
return average_embedding.tolist()
|
|
|
|
# Create embeddings for a text using a tokenizer and an OpenAI engine
|
|
|
|
|
|
def create_embeddings_for_text(text, tokenizer):
|
|
"""Return a list of tuples (text_chunk, embedding) and an average embedding for a text."""
|
|
token_chunks = list(chunks(text, TEXT_EMBEDDING_CHUNK_SIZE, tokenizer))
|
|
text_chunks = [tokenizer.decode(chunk) for chunk in token_chunks]
|
|
|
|
embeddings_response = get_embeddings(text_chunks, EMBEDDINGS_MODEL)
|
|
embeddings = [embedding["embedding"] for embedding in embeddings_response]
|
|
text_embeddings = list(zip(text_chunks, embeddings))
|
|
|
|
average_embedding = get_col_average_from_list_of_lists(embeddings)
|
|
|
|
return (text_embeddings, average_embedding)
|
|
|
|
def get_embeddings(text_array, engine):
|
|
return openai.Engine(id=engine).embeddings(input=text_array)["data"]
|
|
|
|
# Split a text into smaller chunks of size n, preferably ending at the end of a sentence
|
|
def chunks(text, n, tokenizer):
|
|
tokens = tokenizer.encode(text)
|
|
"""Yield successive n-sized chunks from text."""
|
|
i = 0
|
|
while i < len(tokens):
|
|
# Find the nearest end of sentence within a range of 0.5 * n and 1.5 * n tokens
|
|
j = min(i + int(1.5 * n), len(tokens))
|
|
while j > i + int(0.5 * n):
|
|
# Decode the tokens and check for full stop or newline
|
|
chunk = tokenizer.decode(tokens[i:j])
|
|
if chunk.endswith(".") or chunk.endswith("\n"):
|
|
break
|
|
j -= 1
|
|
# If no end of sentence found, use n tokens as the chunk size
|
|
if j == i + int(0.5 * n):
|
|
j = min(i + n, len(tokens))
|
|
yield tokens[i:j]
|
|
i = j
|
|
|
|
def get_unique_id_for_file_chunk(filename, chunk_index):
|
|
return str(filename+"-!"+str(chunk_index))
|
|
|
|
def handle_file_string(file,tokenizer,redis_conn, text_embedding_field,index_name):
|
|
filename = file[0]
|
|
file_body_string = file[1]
|
|
|
|
# Clean up the file string by replacing newlines and double spaces and semi-colons
|
|
clean_file_body_string = file_body_string.replace(" ", " ").replace("\n", "; ").replace(';',' ')
|
|
#
|
|
# Add the filename to the text to embed
|
|
text_to_embed = "Filename is: {}; {}".format(
|
|
filename, clean_file_body_string)
|
|
|
|
# Create embeddings for the text
|
|
try:
|
|
text_embeddings, average_embedding = create_embeddings_for_text(
|
|
text_to_embed, tokenizer)
|
|
#print("[handle_file_string] Created embedding for {}".format(filename))
|
|
except Exception as e:
|
|
print("[handle_file_string] Error creating embedding: {}".format(e))
|
|
|
|
# Get the vectors array of triples: file_chunk_id, embedding, metadata for each embedding
|
|
# Metadata is a dict with keys: filename, file_chunk_index
|
|
vectors = []
|
|
for i, (text_chunk, embedding) in enumerate(text_embeddings):
|
|
id = get_unique_id_for_file_chunk(filename, i)
|
|
vectors.append(({'id': id
|
|
, "vector": embedding, 'metadata': {"filename": filename
|
|
, "text_chunk": text_chunk
|
|
, "file_chunk_index": i}}))
|
|
|
|
try:
|
|
load_vectors(redis_conn, vectors,text_embedding_field)
|
|
|
|
except Exception as e:
|
|
print(f'Ran into a problem uploading to Redis: {e}')
|
|
|
|
# Make a class to generate batches for insertion
|
|
class BatchGenerator:
|
|
|
|
|
|
def __init__(self, batch_size: int = 10) -> None:
|
|
self.batch_size = batch_size
|
|
|
|
# Makes chunks out of an input DataFrame
|
|
def to_batches(self, df: pd.DataFrame) -> Iterator[pd.DataFrame]:
|
|
splits = self.splits_num(df.shape[0])
|
|
if splits <= 1:
|
|
yield df
|
|
else:
|
|
for chunk in np.array_split(df, splits):
|
|
yield chunk
|
|
|
|
# Determines how many chunks DataFrame contains
|
|
def splits_num(self, elements: int) -> int:
|
|
return round(elements / self.batch_size)
|
|
|
|
__call__ = to_batches |