|
|
@ -66,7 +66,7 @@ async def upload_image(
|
|
|
|
)
|
|
|
|
)
|
|
|
|
return result
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
raise RuntimeError(f"Add image failed: {e}")
|
|
|
|
raise RuntimeError(f"Upload image failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_image_upload_api_payload(image_bin: str, tone: str):
|
|
|
|
def build_image_upload_api_payload(image_bin: str, tone: str):
|
|
|
@ -101,82 +101,62 @@ def build_image_upload_api_payload(image_bin: str, tone: str):
|
|
|
|
return data, boundary
|
|
|
|
return data, boundary
|
|
|
|
|
|
|
|
|
|
|
|
def is_data_uri_an_image(data_uri: str):
|
|
|
|
def is_data_uri_an_image(data_uri: str):
|
|
|
|
try:
|
|
|
|
# Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
|
|
|
|
# Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
|
|
|
|
if not re.match(r'data:image/(\w+);base64,', data_uri):
|
|
|
|
if not re.match(r'data:image/(\w+);base64,', data_uri):
|
|
|
|
raise ValueError("Invalid data URI image.")
|
|
|
|
raise ValueError("Invalid data URI image.")
|
|
|
|
# Extract the image format from the data URI
|
|
|
|
# Extract the image format from the data URI
|
|
|
|
image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
|
|
|
|
image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
|
|
|
|
# Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
|
|
|
|
# Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
|
|
|
|
if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
|
|
|
|
if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
|
|
|
|
raise ValueError("Invalid image format (from mime file type).")
|
|
|
|
raise ValueError("Invalid image format (from mime file type).")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_accepted_format(binary_data: bytes) -> bool:
|
|
|
|
def is_accepted_format(binary_data: bytes) -> bool:
|
|
|
|
try:
|
|
|
|
if binary_data.startswith(b'\xFF\xD8\xFF'):
|
|
|
|
check = False
|
|
|
|
pass # It's a JPEG image
|
|
|
|
if binary_data.startswith(b'\xFF\xD8\xFF'):
|
|
|
|
elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
|
|
|
|
check = True # It's a JPEG image
|
|
|
|
pass # It's a PNG image
|
|
|
|
elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
|
|
|
|
elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
|
|
|
|
check = True # It's a PNG image
|
|
|
|
pass # It's a GIF image
|
|
|
|
elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
|
|
|
|
elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
|
|
|
|
check = True # It's a GIF image
|
|
|
|
pass # It's a JPEG image
|
|
|
|
elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
|
|
|
|
elif binary_data.startswith(b'\xFF\xD8'):
|
|
|
|
check = True # It's a JPEG image
|
|
|
|
pass # It's a JPEG image
|
|
|
|
elif binary_data.startswith(b'\xFF\xD8'):
|
|
|
|
elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
|
|
|
|
check = True # It's a JPEG image
|
|
|
|
pass # It's a WebP image
|
|
|
|
elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
|
|
|
|
else:
|
|
|
|
check = True # It's a WebP image
|
|
|
|
raise ValueError("Invalid image format (from magic code).")
|
|
|
|
# else we raise ValueError
|
|
|
|
|
|
|
|
if not check:
|
|
|
|
|
|
|
|
raise ValueError("Invalid image format (from magic code).")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_data_uri(data_uri: str) -> bytes:
|
|
|
|
def extract_data_uri(data_uri: str) -> bytes:
|
|
|
|
try:
|
|
|
|
data = data_uri.split(",")[1]
|
|
|
|
data = data_uri.split(",")[1]
|
|
|
|
data = base64.b64decode(data)
|
|
|
|
data = base64.b64decode(data)
|
|
|
|
return data
|
|
|
|
return data
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_orientation(data: bytes) -> int:
|
|
|
|
def get_orientation(data: bytes) -> int:
|
|
|
|
try:
|
|
|
|
if data[:2] != b'\xFF\xD8':
|
|
|
|
if data[:2] != b'\xFF\xD8':
|
|
|
|
raise Exception('NotJpeg')
|
|
|
|
raise Exception('NotJpeg')
|
|
|
|
with Image.open(data) as img:
|
|
|
|
with Image.open(data) as img:
|
|
|
|
exif_data = img._getexif()
|
|
|
|
exif_data = img._getexif()
|
|
|
|
if exif_data is not None:
|
|
|
|
if exif_data is not None:
|
|
|
|
orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF
|
|
|
|
orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF
|
|
|
|
if orientation is not None:
|
|
|
|
if orientation is not None:
|
|
|
|
return orientation
|
|
|
|
return orientation
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
|
|
|
|
def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
|
|
|
|
try:
|
|
|
|
# Initialize the canvas
|
|
|
|
# Initialize the canvas
|
|
|
|
new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
|
|
|
|
new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
|
|
|
|
if orientation:
|
|
|
|
if orientation:
|
|
|
|
if orientation > 4:
|
|
|
|
if orientation > 4:
|
|
|
|
img = img.transpose(Image.FLIP_LEFT_RIGHT)
|
|
|
|
img = img.transpose(Image.FLIP_LEFT_RIGHT)
|
|
|
|
if orientation in [3, 4]:
|
|
|
|
if orientation in [3, 4]:
|
|
|
|
img = img.transpose(Image.ROTATE_180)
|
|
|
|
img = img.transpose(Image.ROTATE_180)
|
|
|
|
if orientation in [5, 6]:
|
|
|
|
if orientation in [5, 6]:
|
|
|
|
img = img.transpose(Image.ROTATE_270)
|
|
|
|
img = img.transpose(Image.ROTATE_270)
|
|
|
|
if orientation in [7, 8]:
|
|
|
|
if orientation in [7, 8]:
|
|
|
|
img = img.transpose(Image.ROTATE_90)
|
|
|
|
img = img.transpose(Image.ROTATE_90)
|
|
|
|
new_img.paste(img, (0, 0))
|
|
|
|
new_img.paste(img, (0, 0))
|
|
|
|
return new_img
|
|
|
|
return new_img
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compress_image_to_base64(img, compression_rate) -> str:
|
|
|
|
def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str:
|
|
|
|
try:
|
|
|
|
output_buffer = io.BytesIO()
|
|
|
|
output_buffer = io.BytesIO()
|
|
|
|
image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
|
|
|
|
img.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
|
|
|
|
return base64.b64encode(output_buffer.getvalue()).decode('utf-8')
|
|
|
|
return base64.b64encode(output_buffer.getvalue()).decode('utf-8')
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
raise e
|
|
|
|
|