mirror of
https://github.com/maunium/stickerpicker.git
synced 2025-07-21 00:06:02 +02:00
Compare commits
3 Commits
89d3aece04
...
4b96d23621
Author | SHA1 | Date | |
---|---|---|---|
4b96d23621 | |||
3ce380645d | |||
a8effa2efa |
1
setup.py
1
setup.py
@ -50,5 +50,6 @@ setuptools.setup(
|
||||
entry_points={"console_scripts": [
|
||||
"sticker-import=sticker.stickerimport:cmd",
|
||||
"sticker-pack=sticker.pack:cmd",
|
||||
"sticker-download-thumbnails=sticker.download_thumbnails:cmd",
|
||||
]},
|
||||
)
|
||||
|
58
sticker/download_thumbnails.py
Normal file
58
sticker/download_thumbnails.py
Normal file
@ -0,0 +1,58 @@
|
||||
# maunium-stickerpicker - A fast and simple Matrix sticker picker widget.
|
||||
# Copyright (C) 2025 Tulir Asokan
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from aiohttp import ClientSession
|
||||
from yarl import URL
|
||||
|
||||
from .lib import matrix, util
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--config",
|
||||
help="Path to JSON file with Matrix homeserver and access_token",
|
||||
type=str, default="config.json", metavar="file")
|
||||
parser.add_argument("path", help="Path to the sticker pack JSON file", type=str)
|
||||
|
||||
|
||||
async def main(args: argparse.Namespace) -> None:
|
||||
await matrix.load_config(args.config)
|
||||
with util.open_utf8(args.path) as pack_file:
|
||||
pack = json.load(pack_file)
|
||||
print(f"Loaded existing pack meta from {args.path}")
|
||||
|
||||
stickers_data: Dict[str, bytes] = {}
|
||||
async with ClientSession() as sess:
|
||||
for sticker in pack["stickers"]:
|
||||
dl_url = URL(matrix.homeserver_url) / "_matrix/client/v1/media/download" / sticker["url"].removeprefix("mxc://")
|
||||
print("Downloading", sticker["url"])
|
||||
async with sess.get(dl_url, headers={"Authorization": f"Bearer {matrix.access_token}"}) as resp:
|
||||
resp.raise_for_status()
|
||||
stickers_data[sticker["url"]] = await resp.read()
|
||||
|
||||
print("All stickers downloaded, generating thumbnails...")
|
||||
util.add_thumbnails(pack["stickers"], stickers_data, Path(args.path).parent)
|
||||
print("Done!")
|
||||
|
||||
def cmd():
|
||||
asyncio.run(main(parser.parse_args()))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cmd()
|
@ -59,7 +59,7 @@ async def load_config(path: str) -> None:
|
||||
print("Matrix config file not found. Please enter your homeserver and access token.")
|
||||
homeserver_url = input("Homeserver URL: ")
|
||||
access_token = input("Access token: ")
|
||||
whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami"
|
||||
whoami_url = URL(homeserver_url) / "_matrix" / "client" / "v3" / "account" / "whoami"
|
||||
if whoami_url.scheme not in ("https", "http"):
|
||||
whoami_url = whoami_url.with_scheme("https")
|
||||
user_id = await whoami(whoami_url, access_token)
|
||||
@ -71,7 +71,7 @@ async def load_config(path: str) -> None:
|
||||
}, config_file)
|
||||
print(f"Wrote config to {path}")
|
||||
|
||||
upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload"
|
||||
upload_url = URL(homeserver_url) / "_matrix" / "media" / "v3" / "upload"
|
||||
|
||||
|
||||
async def whoami(url: URL, access_token: str) -> str:
|
||||
|
@ -17,6 +17,8 @@ from functools import partial
|
||||
from io import BytesIO
|
||||
import os.path
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
from PIL import Image
|
||||
|
||||
@ -24,19 +26,19 @@ from . import matrix
|
||||
|
||||
open_utf8 = partial(open, encoding='UTF-8')
|
||||
|
||||
def convert_image(data: bytes) -> (bytes, int, int):
|
||||
def convert_image(data: bytes, max_w=256, max_h=256) -> (bytes, int, int):
|
||||
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
|
||||
new_file = BytesIO()
|
||||
image.save(new_file, "png")
|
||||
w, h = image.size
|
||||
if w > 256 or h > 256:
|
||||
if w > max_w or h > max_h:
|
||||
# Set the width and height to lower values so clients wouldn't show them as huge images
|
||||
if w > h:
|
||||
h = int(h / (w / 256))
|
||||
w = 256
|
||||
h = int(h / (w / max_w))
|
||||
w = max_w
|
||||
else:
|
||||
w = int(w / (h / 256))
|
||||
h = 256
|
||||
w = int(w / (h / max_h))
|
||||
h = max_h
|
||||
return new_file.getvalue(), w, h
|
||||
|
||||
|
||||
@ -78,3 +80,15 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
|
||||
},
|
||||
"msgtype": "m.sticker",
|
||||
}
|
||||
|
||||
|
||||
def add_thumbnails(stickers: List[matrix.StickerInfo], stickers_data: Dict[str, bytes], output_dir: str) -> None:
|
||||
thumbnails = Path(output_dir, "thumbnails")
|
||||
thumbnails.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for sticker in stickers:
|
||||
image_data, _, _ = convert_image(stickers_data[sticker["url"]], 128, 128)
|
||||
|
||||
name = sticker["url"].split("/")[-1]
|
||||
thumbnail_path = thumbnails / name
|
||||
thumbnail_path.write_bytes(image_data)
|
@ -13,6 +13,7 @@
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
from hashlib import sha256
|
||||
import mimetypes
|
||||
@ -107,9 +108,11 @@ async def main(args: argparse.Namespace) -> None:
|
||||
old_stickers = {sticker["id"]: sticker for sticker in pack["stickers"]}
|
||||
pack["stickers"] = []
|
||||
|
||||
stickers_data: Dict[str, bytes] = {}
|
||||
for file in sorted(os.listdir(args.path)):
|
||||
sticker = await upload_sticker(file, args.path, old_stickers=old_stickers)
|
||||
if sticker:
|
||||
stickers_data[sticker["url"]] = Path(args.path, file).read_bytes()
|
||||
pack["stickers"].append(sticker)
|
||||
|
||||
with util.open_utf8(meta_path, "w") as pack_file:
|
||||
@ -122,6 +125,8 @@ async def main(args: argparse.Namespace) -> None:
|
||||
with util.open_utf8(picker_pack_path, "w") as pack_file:
|
||||
json.dump(pack, pack_file)
|
||||
print(f"Copied pack to {picker_pack_path}")
|
||||
|
||||
util.add_thumbnails(pack["stickers"], stickers_data, args.add_to_index)
|
||||
util.add_to_index(picker_file_name, args.add_to_index)
|
||||
|
||||
|
||||
@ -138,7 +143,7 @@ parser.add_argument("path", help="Path to the sticker pack directory", type=str)
|
||||
|
||||
|
||||
def cmd():
|
||||
asyncio.get_event_loop().run_until_complete(main(parser.parse_args()))
|
||||
asyncio.run(main(parser.parse_args()))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -13,7 +13,7 @@
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Dict
|
||||
from typing import Dict, Tuple
|
||||
import argparse
|
||||
import asyncio
|
||||
import os.path
|
||||
@ -29,7 +29,7 @@ from telethon.tl.types.messages import StickerSet as StickerSetFull
|
||||
from .lib import matrix, util
|
||||
|
||||
|
||||
async def reupload_document(client: TelegramClient, document: Document) -> matrix.StickerInfo:
|
||||
async def reupload_document(client: TelegramClient, document: Document) -> Tuple[matrix.StickerInfo, bytes]:
|
||||
print(f"Reuploading {document.id}", end="", flush=True)
|
||||
data = await client.download_media(document, file=bytes)
|
||||
print(".", end="", flush=True)
|
||||
@ -37,7 +37,7 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
|
||||
print(".", end="", flush=True)
|
||||
mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
|
||||
print(".", flush=True)
|
||||
return util.make_sticker(mxc, width, height, len(data))
|
||||
return util.make_sticker(mxc, width, height, len(data)), data
|
||||
|
||||
|
||||
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
|
||||
@ -75,15 +75,17 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
stickers_data: Dict[str, bytes] = {}
|
||||
reuploaded_documents: Dict[int, matrix.StickerInfo] = {}
|
||||
for document in pack.documents:
|
||||
try:
|
||||
reuploaded_documents[document.id] = already_uploaded[document.id]
|
||||
print(f"Skipped reuploading {document.id}")
|
||||
except KeyError:
|
||||
reuploaded_documents[document.id] = await reupload_document(client, document)
|
||||
reuploaded_documents[document.id], data = await reupload_document(client, document)
|
||||
# Always ensure the body and telegram metadata is correct
|
||||
add_meta(document, reuploaded_documents[document.id], pack)
|
||||
stickers_data[reuploaded_documents[document.id]["url"]] = data
|
||||
|
||||
for sticker in pack.packs:
|
||||
if not sticker.emoticon:
|
||||
@ -107,6 +109,7 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir
|
||||
}, pack_file, ensure_ascii=False)
|
||||
print(f"Saved {pack.set.title} as {pack.set.short_name}.json")
|
||||
|
||||
util.add_thumbnails(list(reuploaded_documents.values()), stickers_data, output_dir)
|
||||
util.add_to_index(os.path.basename(pack_path), output_dir)
|
||||
|
||||
|
||||
@ -158,7 +161,7 @@ async def main(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def cmd() -> None:
|
||||
asyncio.get_event_loop().run_until_complete(main(parser.parse_args()))
|
||||
asyncio.run(main(parser.parse_args()))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -29,10 +29,8 @@ const params = new URLSearchParams(document.location.search)
|
||||
if (params.has('config')) {
|
||||
INDEX = params.get("config")
|
||||
}
|
||||
// This is updated from packs/index.json
|
||||
let HOMESERVER_URL = "https://matrix-client.matrix.org"
|
||||
|
||||
const makeThumbnailURL = mxc => `${HOMESERVER_URL}/_matrix/media/v3/thumbnail/${mxc.slice(6)}?height=128&width=128&method=scale`
|
||||
const makeThumbnailURL = mxc => `${PACKS_BASE_URL}/thumbnails/${mxc.split("/").slice(-1)[0]}`
|
||||
|
||||
// We need to detect iOS webkit because it has a bug related to scrolling non-fixed divs
|
||||
// This is also used to fix scrolling to sections on Element iOS
|
||||
@ -165,7 +163,6 @@ class App extends Component {
|
||||
return
|
||||
}
|
||||
const indexData = await indexRes.json()
|
||||
HOMESERVER_URL = indexData.homeserver_url || HOMESERVER_URL
|
||||
if (indexData.giphy_api_key !== undefined) {
|
||||
setGiphyAPIKey(indexData.giphy_api_key, indexData.giphy_mxc_prefix)
|
||||
}
|
||||
|
Reference in New Issue
Block a user