mirror of
https://github.com/maunium/stickerpicker.git
synced 2025-01-28 19:04:59 +01:00
Merge aa8616a40acd2fbc31a4ed83f85edd6ac19035c8 into 89d3aece041c85ebe5a1ad4e620388af5227cbb0
This commit is contained in:
commit
a03bd493c0
2
.gitignore
vendored
2
.gitignore
vendored
@ -5,9 +5,11 @@
|
||||
*.pyc
|
||||
__pycache__
|
||||
*.egg-info
|
||||
build/
|
||||
|
||||
node_modules
|
||||
web/lib/import-map.json
|
||||
web/packs/*.json
|
||||
|
||||
*.session
|
||||
/*.json
|
||||
|
@ -4,3 +4,4 @@ pillow
|
||||
telethon
|
||||
cryptg
|
||||
python-magic
|
||||
lottie[all]
|
||||
|
@ -15,29 +15,262 @@
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from functools import partial
|
||||
from io import BytesIO
|
||||
import numpy as np
|
||||
import os.path
|
||||
import subprocess
|
||||
import json
|
||||
import tempfile
|
||||
import mimetypes
|
||||
|
||||
from PIL import Image
|
||||
try:
|
||||
import magic
|
||||
except ImportError:
|
||||
print("[Warning] Magic is not installed, using file extensions to guess mime types")
|
||||
magic = None
|
||||
from PIL import Image, ImageSequence, ImageFilter
|
||||
|
||||
from . import matrix
|
||||
|
||||
open_utf8 = partial(open, encoding='UTF-8')
|
||||
|
||||
def convert_image(data: bytes) -> (bytes, int, int):
|
||||
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
|
||||
new_file = BytesIO()
|
||||
image.save(new_file, "png")
|
||||
w, h = image.size
|
||||
if w > 256 or h > 256:
|
||||
# Set the width and height to lower values so clients wouldn't show them as huge images
|
||||
if w > h:
|
||||
h = int(h / (w / 256))
|
||||
w = 256
|
||||
|
||||
def guess_mime(data: bytes) -> str:
|
||||
mime = None
|
||||
if magic:
|
||||
try:
|
||||
return magic.Magic(mime=True).from_buffer(data)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
with tempfile.NamedTemporaryFile() as temp:
|
||||
temp.write(data)
|
||||
temp.close()
|
||||
mime, _ = mimetypes.guess_type(temp.name)
|
||||
return mime or "image/png"
|
||||
|
||||
|
||||
def _video_to_webp(data: bytes) -> bytes:
|
||||
mime = guess_mime(data)
|
||||
ext = mimetypes.guess_extension(mime)
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as video:
|
||||
video.write(data)
|
||||
video.flush()
|
||||
with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
|
||||
print(".", end="", flush=True)
|
||||
ffmpeg_encoder_args = []
|
||||
if mime == "video/webm":
|
||||
encode = subprocess.run(["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=nokey=1:noprint_wrappers=1", video.name], capture_output=True, text=True).stdout.strip()
|
||||
ffmpeg_encoder = None
|
||||
if encode == "vp8":
|
||||
ffmpeg_encoder = "libvpx"
|
||||
elif encode == "vp9":
|
||||
ffmpeg_encoder = "libvpx-vp9"
|
||||
if ffmpeg_encoder:
|
||||
ffmpeg_encoder_args = ["-c:v", ffmpeg_encoder]
|
||||
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", *ffmpeg_encoder_args, "-i", video.name, "-lossless", "1", webp.name],
|
||||
capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
||||
webp.seek(0)
|
||||
return webp.read()
|
||||
|
||||
|
||||
def video_to_webp(data: bytes) -> bytes:
|
||||
mime = guess_mime(data)
|
||||
ext = mimetypes.guess_extension(mime)
|
||||
# run ffmpeg to fix duration
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
||||
temp.write(data)
|
||||
temp.flush()
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
|
||||
print(".", end="", flush=True)
|
||||
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
|
||||
capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
||||
temp_fixed.seek(0)
|
||||
data = temp_fixed.read()
|
||||
return _video_to_webp(data)
|
||||
|
||||
|
||||
def process_frame(frame):
|
||||
"""
|
||||
Process GIF frame, repair edges, ensure no white or semi-transparent pixels, while keeping color information intact.
|
||||
"""
|
||||
frame = frame.convert('RGBA')
|
||||
|
||||
# Decompose Alpha channel
|
||||
alpha = frame.getchannel('A')
|
||||
|
||||
# Process Alpha channel with threshold, remove semi-transparent pixels
|
||||
# Threshold can be adjusted as needed (0-255), 128 is the middle value
|
||||
threshold = 128
|
||||
alpha = alpha.point(lambda x: 255 if x >= threshold else 0)
|
||||
|
||||
# Process Alpha channel with MinFilter, remove edge noise
|
||||
alpha = alpha.filter(ImageFilter.MinFilter(3))
|
||||
|
||||
# Process Alpha channel with MaxFilter, repair edges
|
||||
alpha = alpha.filter(ImageFilter.MaxFilter(3))
|
||||
|
||||
# Apply processed Alpha channel back to image
|
||||
frame.putalpha(alpha)
|
||||
|
||||
return frame
|
||||
|
||||
|
||||
def webp_to_others(data: bytes, mimetype: str) -> bytes:
|
||||
format = mimetypes.guess_extension(mimetype)[1:]
|
||||
print(format)
|
||||
with Image.open(BytesIO(data)) as webp:
|
||||
with BytesIO() as img:
|
||||
print(".", end="", flush=True)
|
||||
webp.info.pop('background', None)
|
||||
|
||||
if mimetype == "image/gif":
|
||||
frames = []
|
||||
duration = [100, ]
|
||||
|
||||
for frame in ImageSequence.Iterator(webp):
|
||||
frame = process_frame(frame)
|
||||
frames.append(frame)
|
||||
duration.append(frame.info.get('duration', duration[-1]))
|
||||
|
||||
frames[0].save(img, format=format, save_all=True, lossless=True, quality=100, method=6,
|
||||
append_images=frames[1:], loop=0, duration=duration[1:], disposal=2)
|
||||
else:
|
||||
webp.save(img, format=format, lossless=True, quality=100, method=6)
|
||||
|
||||
img.seek(0)
|
||||
return img.read()
|
||||
|
||||
|
||||
def is_uniform_animated_webp(data: bytes) -> bool:
|
||||
with Image.open(BytesIO(data)) as img:
|
||||
if img.n_frames <= 1:
|
||||
return True
|
||||
|
||||
img_iter = ImageSequence.Iterator(img)
|
||||
first_frame = np.array(img_iter[0].convert("RGBA"))
|
||||
|
||||
for frame in img_iter:
|
||||
current_frame = np.array(frame.convert("RGBA"))
|
||||
if not np.array_equal(first_frame, current_frame):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def webp_to_gif_or_png(data: bytes) -> bytes:
|
||||
with Image.open(BytesIO(data)) as image:
|
||||
# check if the webp is animated
|
||||
is_animated = getattr(image, "is_animated", False)
|
||||
if is_animated and not is_uniform_animated_webp(data):
|
||||
return webp_to_others(data, "image/gif")
|
||||
else:
|
||||
w = int(w / (h / 256))
|
||||
h = 256
|
||||
return new_file.getvalue(), w, h
|
||||
# convert to png
|
||||
return webp_to_others(data, "image/png")
|
||||
|
||||
|
||||
def opermize_gif(data: bytes) -> bytes:
|
||||
with tempfile.NamedTemporaryFile() as gif:
|
||||
gif.write(data)
|
||||
gif.flush()
|
||||
# use gifsicle to optimize gif
|
||||
result = subprocess.run(["gifsicle", "--batch", "--optimize=3", "--colors=256", gif.name],
|
||||
capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Run gifsicle failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
||||
gif.seek(0)
|
||||
return gif.read()
|
||||
|
||||
|
||||
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
||||
with Image.open(BytesIO(data)) as image:
|
||||
with BytesIO() as new_file:
|
||||
# Determine if the image is a GIF
|
||||
is_animated = getattr(image, "is_animated", False)
|
||||
if is_animated:
|
||||
frames = [frame.convert("RGBA") for frame in ImageSequence.Iterator(image)]
|
||||
# Save the new GIF
|
||||
frames[0].save(
|
||||
new_file,
|
||||
format='GIF',
|
||||
save_all=True,
|
||||
append_images=frames[1:],
|
||||
loop=image.info.get('loop', 0), # Default loop to 0 if not present
|
||||
duration=image.info.get('duration', 100), # Set a default duration if not present
|
||||
disposal=image.info.get('disposal', 2) # Default to disposal method 2 (restore to background)
|
||||
)
|
||||
# Get the size of the first frame to determine resizing
|
||||
w, h = frames[0].size
|
||||
else:
|
||||
suffix = mimetypes.guess_extension(mimetype)
|
||||
if suffix:
|
||||
suffix = suffix[1:]
|
||||
image = image.convert("RGBA")
|
||||
image.save(new_file, format=suffix)
|
||||
w, h = image.size
|
||||
if w > 256 or h > 256:
|
||||
# Set the width and height to lower values so clients wouldn't show them as huge images
|
||||
if w > h:
|
||||
h = int(h / (w / 256))
|
||||
w = 256
|
||||
else:
|
||||
w = int(w / (h / 256))
|
||||
h = 256
|
||||
return new_file.getvalue(), w, h
|
||||
|
||||
|
||||
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||
mimetype = guess_mime(data)
|
||||
if mimetype.startswith("video/"):
|
||||
data = video_to_webp(data)
|
||||
print(".", end="", flush=True)
|
||||
elif mimetype.startswith("application/gzip"):
|
||||
print(".", end="", flush=True)
|
||||
# unzip file
|
||||
import gzip
|
||||
with gzip.open(BytesIO(data), "rb") as f:
|
||||
data = f.read()
|
||||
mimetype = guess_mime(data)
|
||||
suffix = mimetypes.guess_extension(mimetype)
|
||||
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
|
||||
temp.write(data)
|
||||
with tempfile.NamedTemporaryFile(suffix=".webp") as gif:
|
||||
# run lottie_convert.py input output
|
||||
print(".", end="", flush=True)
|
||||
import subprocess
|
||||
cmd = ["lottie_convert.py", temp.name, gif.name]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
retcode = result.returncode
|
||||
if retcode != 0:
|
||||
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
|
||||
gif.seek(0)
|
||||
data = gif.read()
|
||||
mimetype = guess_mime(data)
|
||||
if mimetype == "image/webp":
|
||||
data = webp_to_gif_or_png(data)
|
||||
mimetype = guess_mime(data)
|
||||
rlt = _convert_image(data, mimetype)
|
||||
data = rlt[0]
|
||||
if mimetype == "image/gif":
|
||||
print(".", end="", flush=True)
|
||||
data = opermize_gif(data)
|
||||
return data, mimetype, rlt[1], rlt[2]
|
||||
|
||||
|
||||
def convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||
try:
|
||||
return _convert_sticker(data)
|
||||
except Exception as e:
|
||||
mimetype = guess_mime(data)
|
||||
print(f"Error converting image, mimetype: {mimetype}")
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
|
||||
temp.write(data)
|
||||
print(f"Saved to {temp.name}")
|
||||
raise e
|
||||
|
||||
|
||||
def add_to_index(name: str, output_dir: str) -> None:
|
||||
@ -57,7 +290,7 @@ def add_to_index(name: str, output_dir: str) -> None:
|
||||
|
||||
|
||||
def make_sticker(mxc: str, width: int, height: int, size: int,
|
||||
body: str = "") -> matrix.StickerInfo:
|
||||
mimetype: str, body: str = "") -> matrix.StickerInfo:
|
||||
return {
|
||||
"body": body,
|
||||
"url": mxc,
|
||||
@ -65,7 +298,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
|
||||
"w": width,
|
||||
"h": height,
|
||||
"size": size,
|
||||
"mimetype": "image/png",
|
||||
"mimetype": mimetype,
|
||||
|
||||
# Element iOS compatibility hack
|
||||
"thumbnail_url": mxc,
|
||||
@ -73,7 +306,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
|
||||
"w": width,
|
||||
"h": height,
|
||||
"size": size,
|
||||
"mimetype": "image/png",
|
||||
"mimetype": mimetype,
|
||||
},
|
||||
},
|
||||
"msgtype": "m.sticker",
|
||||
|
@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
|
||||
}
|
||||
print(f".. using existing upload")
|
||||
else:
|
||||
image_data, width, height = util.convert_image(image_data)
|
||||
image_data, mimetype, width, height = util.convert_sticker(image_data)
|
||||
print(".", end="", flush=True)
|
||||
mxc = await matrix.upload(image_data, "image/png", file)
|
||||
mxc = await matrix.upload(image_data, mimetype, file)
|
||||
print(".", end="", flush=True)
|
||||
sticker = util.make_sticker(mxc, width, height, len(image_data), name)
|
||||
sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name)
|
||||
sticker["id"] = sticker_id
|
||||
print(" uploaded", flush=True)
|
||||
return sticker
|
||||
|
@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
|
||||
print(f"Reuploading {document.id}", end="", flush=True)
|
||||
data = await client.download_media(document, file=bytes)
|
||||
print(".", end="", flush=True)
|
||||
data, width, height = util.convert_image(data)
|
||||
data, mimetype, width, height = util.convert_sticker(data)
|
||||
print(".", end="", flush=True)
|
||||
mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
|
||||
mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
|
||||
print(".", flush=True)
|
||||
return util.make_sticker(mxc, width, height, len(data))
|
||||
return util.make_sticker(mxc, width, height, len(data), mimetype)
|
||||
|
||||
|
||||
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
|
||||
|
Loading…
x
Reference in New Issue
Block a user