2023-12-11 21:53:30 +00:00
|
|
|
from io import BytesIO
|
2024-03-26 15:51:52 +00:00
|
|
|
from pathlib import Path
|
2023-12-11 21:53:30 +00:00
|
|
|
from typing import Any, List, Tuple, Union
|
|
|
|
|
|
|
|
import requests
|
|
|
|
from langchain_core.documents import Document
|
|
|
|
|
|
|
|
from langchain_community.document_loaders.base import BaseLoader
|
|
|
|
|
|
|
|
|
|
|
|
class ImageCaptionLoader(BaseLoader):
|
|
|
|
"""Load image captions.
|
|
|
|
|
|
|
|
By default, the loader utilizes the pre-trained
|
|
|
|
Salesforce BLIP image captioning model.
|
|
|
|
https://huggingface.co/Salesforce/blip-image-captioning-base
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
2024-03-26 15:51:52 +00:00
|
|
|
images: Union[str, Path, bytes, List[Union[str, bytes, Path]]],
|
2023-12-11 21:53:30 +00:00
|
|
|
blip_processor: str = "Salesforce/blip-image-captioning-base",
|
|
|
|
blip_model: str = "Salesforce/blip-image-captioning-base",
|
|
|
|
):
|
|
|
|
"""Initialize with a list of image data (bytes) or file paths
|
|
|
|
|
|
|
|
Args:
|
|
|
|
images: Either a single image or a list of images. Accepts
|
|
|
|
image data (bytes) or file paths to images.
|
|
|
|
blip_processor: The name of the pre-trained BLIP processor.
|
|
|
|
blip_model: The name of the pre-trained BLIP model.
|
|
|
|
"""
|
2024-03-26 15:51:52 +00:00
|
|
|
if isinstance(images, (str, Path, bytes)):
|
2023-12-11 21:53:30 +00:00
|
|
|
self.images = [images]
|
|
|
|
else:
|
|
|
|
self.images = images
|
|
|
|
|
|
|
|
self.blip_processor = blip_processor
|
|
|
|
self.blip_model = blip_model
|
|
|
|
|
|
|
|
def load(self) -> List[Document]:
|
|
|
|
"""Load from a list of image data or file paths"""
|
|
|
|
try:
|
|
|
|
from transformers import BlipForConditionalGeneration, BlipProcessor
|
|
|
|
except ImportError:
|
|
|
|
raise ImportError(
|
|
|
|
"`transformers` package not found, please install with "
|
|
|
|
"`pip install transformers`."
|
|
|
|
)
|
|
|
|
|
|
|
|
processor = BlipProcessor.from_pretrained(self.blip_processor)
|
|
|
|
model = BlipForConditionalGeneration.from_pretrained(self.blip_model)
|
|
|
|
|
|
|
|
results = []
|
|
|
|
for image in self.images:
|
|
|
|
caption, metadata = self._get_captions_and_metadata(
|
|
|
|
model=model, processor=processor, image=image
|
|
|
|
)
|
|
|
|
doc = Document(page_content=caption, metadata=metadata)
|
|
|
|
results.append(doc)
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
def _get_captions_and_metadata(
|
2024-03-26 15:51:52 +00:00
|
|
|
self, model: Any, processor: Any, image: Union[str, Path, bytes]
|
2023-12-11 21:53:30 +00:00
|
|
|
) -> Tuple[str, dict]:
|
|
|
|
"""Helper function for getting the captions and metadata of an image."""
|
|
|
|
try:
|
|
|
|
from PIL import Image
|
|
|
|
except ImportError:
|
|
|
|
raise ImportError(
|
|
|
|
"`PIL` package not found, please install with `pip install pillow`"
|
|
|
|
)
|
|
|
|
|
|
|
|
image_source = image # Save the original source for later reference
|
|
|
|
|
|
|
|
try:
|
|
|
|
if isinstance(image, bytes):
|
|
|
|
image = Image.open(BytesIO(image)).convert("RGB")
|
2024-03-26 15:51:52 +00:00
|
|
|
elif isinstance(image, str) and (
|
|
|
|
image.startswith("http://") or image.startswith("https://")
|
|
|
|
):
|
2023-12-11 21:53:30 +00:00
|
|
|
image = Image.open(requests.get(image, stream=True).raw).convert("RGB")
|
|
|
|
else:
|
|
|
|
image = Image.open(image).convert("RGB")
|
|
|
|
except Exception:
|
|
|
|
if isinstance(image_source, bytes):
|
|
|
|
msg = "Could not get image data from bytes"
|
|
|
|
else:
|
|
|
|
msg = f"Could not get image data for {image_source}"
|
|
|
|
raise ValueError(msg)
|
|
|
|
|
|
|
|
inputs = processor(image, "an image of", return_tensors="pt")
|
|
|
|
output = model.generate(**inputs)
|
|
|
|
|
|
|
|
caption: str = processor.decode(output[0])
|
|
|
|
if isinstance(image_source, bytes):
|
|
|
|
metadata: dict = {"image_source": "Image bytes provided"}
|
|
|
|
else:
|
2024-03-26 15:51:52 +00:00
|
|
|
metadata = {"image_path": str(image_source)}
|
2023-12-11 21:53:30 +00:00
|
|
|
|
|
|
|
return caption, metadata
|