Compare commits

...

3 Commits

7 changed files with 96 additions and 18 deletions

View File

@ -50,5 +50,6 @@ setuptools.setup(
entry_points={"console_scripts": [ entry_points={"console_scripts": [
"sticker-import=sticker.stickerimport:cmd", "sticker-import=sticker.stickerimport:cmd",
"sticker-pack=sticker.pack:cmd", "sticker-pack=sticker.pack:cmd",
"sticker-download-thumbnails=sticker.download_thumbnails:cmd",
]}, ]},
) )

View File

@ -0,0 +1,58 @@
# maunium-stickerpicker - A fast and simple Matrix sticker picker widget.
# Copyright (C) 2025 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
from typing import Dict
import argparse
import asyncio
import json
from aiohttp import ClientSession
from yarl import URL
from .lib import matrix, util
parser = argparse.ArgumentParser()
parser.add_argument("--config",
help="Path to JSON file with Matrix homeserver and access_token",
type=str, default="config.json", metavar="file")
parser.add_argument("path", help="Path to the sticker pack JSON file", type=str)
async def main(args: argparse.Namespace) -> None:
await matrix.load_config(args.config)
with util.open_utf8(args.path) as pack_file:
pack = json.load(pack_file)
print(f"Loaded existing pack meta from {args.path}")
stickers_data: Dict[str, bytes] = {}
async with ClientSession() as sess:
for sticker in pack["stickers"]:
dl_url = URL(matrix.homeserver_url) / "_matrix/client/v1/media/download" / sticker["url"].removeprefix("mxc://")
print("Downloading", sticker["url"])
async with sess.get(dl_url, headers={"Authorization": f"Bearer {matrix.access_token}"}) as resp:
resp.raise_for_status()
stickers_data[sticker["url"]] = await resp.read()
print("All stickers downloaded, generating thumbnails...")
util.add_thumbnails(pack["stickers"], stickers_data, Path(args.path).parent)
print("Done!")
def cmd():
asyncio.run(main(parser.parse_args()))
if __name__ == "__main__":
cmd()

View File

@ -59,7 +59,7 @@ async def load_config(path: str) -> None:
print("Matrix config file not found. Please enter your homeserver and access token.") print("Matrix config file not found. Please enter your homeserver and access token.")
homeserver_url = input("Homeserver URL: ") homeserver_url = input("Homeserver URL: ")
access_token = input("Access token: ") access_token = input("Access token: ")
whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami" whoami_url = URL(homeserver_url) / "_matrix" / "client" / "v3" / "account" / "whoami"
if whoami_url.scheme not in ("https", "http"): if whoami_url.scheme not in ("https", "http"):
whoami_url = whoami_url.with_scheme("https") whoami_url = whoami_url.with_scheme("https")
user_id = await whoami(whoami_url, access_token) user_id = await whoami(whoami_url, access_token)
@ -71,7 +71,7 @@ async def load_config(path: str) -> None:
}, config_file) }, config_file)
print(f"Wrote config to {path}") print(f"Wrote config to {path}")
upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload" upload_url = URL(homeserver_url) / "_matrix" / "media" / "v3" / "upload"
async def whoami(url: URL, access_token: str) -> str: async def whoami(url: URL, access_token: str) -> str:

View File

@ -17,6 +17,8 @@ from functools import partial
from io import BytesIO from io import BytesIO
import os.path import os.path
import json import json
from pathlib import Path
from typing import Dict, List
from PIL import Image from PIL import Image
@ -24,19 +26,19 @@ from . import matrix
open_utf8 = partial(open, encoding='UTF-8') open_utf8 = partial(open, encoding='UTF-8')
def convert_image(data: bytes) -> (bytes, int, int): def convert_image(data: bytes, max_w=256, max_h=256) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
new_file = BytesIO() new_file = BytesIO()
image.save(new_file, "png") image.save(new_file, "png")
w, h = image.size w, h = image.size
if w > 256 or h > 256: if w > max_w or h > max_h:
# Set the width and height to lower values so clients wouldn't show them as huge images # Set the width and height to lower values so clients wouldn't show them as huge images
if w > h: if w > h:
h = int(h / (w / 256)) h = int(h / (w / max_w))
w = 256 w = max_w
else: else:
w = int(w / (h / 256)) w = int(w / (h / max_h))
h = 256 h = max_h
return new_file.getvalue(), w, h return new_file.getvalue(), w, h
@ -78,3 +80,15 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
}, },
"msgtype": "m.sticker", "msgtype": "m.sticker",
} }
def add_thumbnails(stickers: List[matrix.StickerInfo], stickers_data: Dict[str, bytes], output_dir: str) -> None:
thumbnails = Path(output_dir, "thumbnails")
thumbnails.mkdir(parents=True, exist_ok=True)
for sticker in stickers:
image_data, _, _ = convert_image(stickers_data[sticker["url"]], 128, 128)
name = sticker["url"].split("/")[-1]
thumbnail_path = thumbnails / name
thumbnail_path.write_bytes(image_data)

View File

@ -13,6 +13,7 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
from typing import Dict, Optional from typing import Dict, Optional
from hashlib import sha256 from hashlib import sha256
import mimetypes import mimetypes
@ -107,9 +108,11 @@ async def main(args: argparse.Namespace) -> None:
old_stickers = {sticker["id"]: sticker for sticker in pack["stickers"]} old_stickers = {sticker["id"]: sticker for sticker in pack["stickers"]}
pack["stickers"] = [] pack["stickers"] = []
stickers_data: Dict[str, bytes] = {}
for file in sorted(os.listdir(args.path)): for file in sorted(os.listdir(args.path)):
sticker = await upload_sticker(file, args.path, old_stickers=old_stickers) sticker = await upload_sticker(file, args.path, old_stickers=old_stickers)
if sticker: if sticker:
stickers_data[sticker["url"]] = Path(args.path, file).read_bytes()
pack["stickers"].append(sticker) pack["stickers"].append(sticker)
with util.open_utf8(meta_path, "w") as pack_file: with util.open_utf8(meta_path, "w") as pack_file:
@ -122,6 +125,8 @@ async def main(args: argparse.Namespace) -> None:
with util.open_utf8(picker_pack_path, "w") as pack_file: with util.open_utf8(picker_pack_path, "w") as pack_file:
json.dump(pack, pack_file) json.dump(pack, pack_file)
print(f"Copied pack to {picker_pack_path}") print(f"Copied pack to {picker_pack_path}")
util.add_thumbnails(pack["stickers"], stickers_data, args.add_to_index)
util.add_to_index(picker_file_name, args.add_to_index) util.add_to_index(picker_file_name, args.add_to_index)
@ -138,7 +143,7 @@ parser.add_argument("path", help="Path to the sticker pack directory", type=str)
def cmd(): def cmd():
asyncio.get_event_loop().run_until_complete(main(parser.parse_args())) asyncio.run(main(parser.parse_args()))
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -13,7 +13,7 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Dict from typing import Dict, Tuple
import argparse import argparse
import asyncio import asyncio
import os.path import os.path
@ -29,7 +29,7 @@ from telethon.tl.types.messages import StickerSet as StickerSetFull
from .lib import matrix, util from .lib import matrix, util
async def reupload_document(client: TelegramClient, document: Document) -> matrix.StickerInfo: async def reupload_document(client: TelegramClient, document: Document) -> Tuple[matrix.StickerInfo, bytes]:
print(f"Reuploading {document.id}", end="", flush=True) print(f"Reuploading {document.id}", end="", flush=True)
data = await client.download_media(document, file=bytes) data = await client.download_media(document, file=bytes)
print(".", end="", flush=True) print(".", end="", flush=True)
@ -37,7 +37,7 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
print(".", end="", flush=True) print(".", end="", flush=True)
mxc = await matrix.upload(data, "image/png", f"{document.id}.png") mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
print(".", flush=True) print(".", flush=True)
return util.make_sticker(mxc, width, height, len(data)) return util.make_sticker(mxc, width, height, len(data)), data
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None: def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
@ -75,15 +75,17 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir
except FileNotFoundError: except FileNotFoundError:
pass pass
stickers_data: Dict[str, bytes] = {}
reuploaded_documents: Dict[int, matrix.StickerInfo] = {} reuploaded_documents: Dict[int, matrix.StickerInfo] = {}
for document in pack.documents: for document in pack.documents:
try: try:
reuploaded_documents[document.id] = already_uploaded[document.id] reuploaded_documents[document.id] = already_uploaded[document.id]
print(f"Skipped reuploading {document.id}") print(f"Skipped reuploading {document.id}")
except KeyError: except KeyError:
reuploaded_documents[document.id] = await reupload_document(client, document) reuploaded_documents[document.id], data = await reupload_document(client, document)
# Always ensure the body and telegram metadata is correct # Always ensure the body and telegram metadata is correct
add_meta(document, reuploaded_documents[document.id], pack) add_meta(document, reuploaded_documents[document.id], pack)
stickers_data[reuploaded_documents[document.id]["url"]] = data
for sticker in pack.packs: for sticker in pack.packs:
if not sticker.emoticon: if not sticker.emoticon:
@ -107,6 +109,7 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir
}, pack_file, ensure_ascii=False) }, pack_file, ensure_ascii=False)
print(f"Saved {pack.set.title} as {pack.set.short_name}.json") print(f"Saved {pack.set.title} as {pack.set.short_name}.json")
util.add_thumbnails(list(reuploaded_documents.values()), stickers_data, output_dir)
util.add_to_index(os.path.basename(pack_path), output_dir) util.add_to_index(os.path.basename(pack_path), output_dir)
@ -158,7 +161,7 @@ async def main(args: argparse.Namespace) -> None:
def cmd() -> None: def cmd() -> None:
asyncio.get_event_loop().run_until_complete(main(parser.parse_args())) asyncio.run(main(parser.parse_args()))
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -29,10 +29,8 @@ const params = new URLSearchParams(document.location.search)
if (params.has('config')) { if (params.has('config')) {
INDEX = params.get("config") INDEX = params.get("config")
} }
// This is updated from packs/index.json
let HOMESERVER_URL = "https://matrix-client.matrix.org"
const makeThumbnailURL = mxc => `${HOMESERVER_URL}/_matrix/media/v3/thumbnail/${mxc.slice(6)}?height=128&width=128&method=scale` const makeThumbnailURL = mxc => `${PACKS_BASE_URL}/thumbnails/${mxc.split("/").slice(-1)[0]}`
// We need to detect iOS webkit because it has a bug related to scrolling non-fixed divs // We need to detect iOS webkit because it has a bug related to scrolling non-fixed divs
// This is also used to fix scrolling to sections on Element iOS // This is also used to fix scrolling to sections on Element iOS
@ -165,7 +163,6 @@ class App extends Component {
return return
} }
const indexData = await indexRes.json() const indexData = await indexRes.json()
HOMESERVER_URL = indexData.homeserver_url || HOMESERVER_URL
if (indexData.giphy_api_key !== undefined) { if (indexData.giphy_api_key !== undefined) {
setGiphyAPIKey(indexData.giphy_api_key, indexData.giphy_mxc_prefix) setGiphyAPIKey(indexData.giphy_api_key, indexData.giphy_mxc_prefix)
} }