hello-web/main.py

53 lines
1.2 KiB
Python
Raw Normal View History

2023-03-17 02:18:22 +01:00
import asyncio
import webbrowser
import certstream
import httpx
from termcolor import colored
client = httpx.AsyncClient()
semaphore = asyncio.Semaphore(10)
async def handler(message: dict, context: dict) -> None:
if message["message_type"] != "certificate_update":
return
domains = message["data"]["leaf_cert"]["all_domains"]
try:
domain = next(d for d in domains if "*" not in d)
except StopIteration:
return
url = f"https://{domain}"
try:
async with semaphore:
r = await client.head(url, timeout=1)
except httpx.HTTPError:
print(colored("???", on_color="on_dark_grey"), domain)
return
if r.status_code != 200:
print(colored(str(r.status_code), on_color="on_yellow"), domain)
return
print(colored(str(r.status_code), on_color="on_cyan"), domain)
webbrowser.open_new_tab(url)
async def main() -> None:
loop = asyncio.get_running_loop()
def callback(message: dict, context: dict) -> None:
asyncio.run_coroutine_threadsafe(handler(message, context), loop=loop)
await asyncio.to_thread(
certstream.listen_for_events,
message_callback=callback,
url="wss://certstream.calidog.io/",
)
asyncio.run(main())