You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
4.1 KiB

import base64
import re
from typing import List, Union
from pydantic import BaseModel
from .core import Invoker, Prompty, SimpleModel
class PromptyChatParser(Invoker):
def __init__(self, prompty: Prompty) -> None:
self.prompty = prompty
self.roles = ["assistant", "function", "system", "user", "human", "ai"]
self.path = self.prompty.file.parent
def inline_image(self, image_item: str) -> str:
# pass through if it's a url or base64 encoded
if image_item.startswith("http") or image_item.startswith("data"):
return image_item
# otherwise, it's a local file - need to base64 encode it
image_path = self.path / image_item
with open(image_path, "rb") as f:
base64_image = base64.b64encode("utf-8")
if image_path.suffix == ".png":
return f"data:image/png;base64,{base64_image}"
elif image_path.suffix == ".jpg":
return f"data:image/jpeg;base64,{base64_image}"
elif image_path.suffix == ".jpeg":
return f"data:image/jpeg;base64,{base64_image}"
raise ValueError(
f"Invalid image format {image_path.suffix} - currently only .png "
"and .jpg / .jpeg are supported."
def parse_content(self, content: str) -> Union[str, List]:
"""for parsing inline images"""
# regular expression to parse markdown images
image = r"(?P<alt>!\[[^\]]*\])\((?P<filename>.*?)(?=\"|\))\)"
matches = re.findall(image, content, flags=re.MULTILINE)
if len(matches) > 0:
content_items = []
content_chunks = re.split(image, content, flags=re.MULTILINE)
current_chunk = 0
for i in range(len(content_chunks)):
# image entry
if (
current_chunk < len(matches)
and content_chunks[i] == matches[current_chunk][0]
"type": "image_url",
"image_url": {
"url": self.inline_image(
matches[current_chunk][1].split(" ")[0].strip()
# second part of image entry
elif (
current_chunk < len(matches)
and content_chunks[i] == matches[current_chunk][1]
current_chunk += 1
# text entry
if len(content_chunks[i].strip()) > 0:
{"type": "text", "text": content_chunks[i].strip()}
return content_items
return content
def invoke(self, data: BaseModel) -> BaseModel:
assert isinstance(data, SimpleModel)
messages = []
separator = r"(?i)^\s*#?\s*(" + "|".join(self.roles) + r")\s*:\s*\n"
# get valid chunks - remove empty items
chunks = [
for item in re.split(separator, data.item, flags=re.MULTILINE)
if len(item.strip()) > 0
# if no starter role, then inject system role
if chunks[0].strip().lower() not in self.roles:
chunks.insert(0, "system")
# if last chunk is role entry, then remove (no content?)
if chunks[-1].strip().lower() in self.roles:
if len(chunks) % 2 != 0:
raise ValueError("Invalid prompt format")
# create messages
for i in range(0, len(chunks), 2):
role = chunks[i].strip().lower()
content = chunks[i + 1].strip()
messages.append({"role": role, "content": self.parse_content(content)})
return SimpleModel[list](item=messages)