Compare commits

...

2 commits

Author SHA1 Message Date
7f5b867c37 wip 2024-09-05 01:53:19 +02:00
fb54dd01ec surf around for a bit 2024-08-07 00:38:25 +02:00
5 changed files with 286 additions and 45 deletions

View file

@ -25,12 +25,18 @@ newly issued certificates and attempts to open the domain in Firefox using
Selenium.
## Building
## Development
```shell
# Build
nix build .#oci
./result | podman load
podman run --rm autosurfer:dev
# podman push autosurfer:dev quay.io/caspervk/autosurfer:latest
# Release
podman push autosurfer:dev quay.io/caspervk/autosurfer:latest
# 👉😎👉
podman run --rm -v ./autosurfer/:/autosurfer/:ro --network host --env DISPLAY --security-opt label=type:container_runtime_t autosurfer:dev
```

177
autosurfer/ct.py Normal file
View file

@ -0,0 +1,177 @@
from datetime import UTC, datetime
import logging
import random
from functools import wraps
from json import JSONDecodeError
import asyncio
import base64
from cryptography import x509
import httpx
import structlog
logger = structlog.stdlib.get_logger()
client = httpx.AsyncClient()
async def get_servers() -> set[str]:
"""TODO."""
# The format of these server lists are not part of the RFC.
# https://certificate.transparency.dev/useragents/
server_lists = {
"https://www.gstatic.com/ct/log_list/v3/log_list.json",
"https://valid.apple.com/ct/log_list/current_log_list.json",
}
servers = set()
now = datetime.now(tz=UTC)
for server_list in server_lists:
try:
r = await client.get(server_list)
r.raise_for_status()
servers.update(
log["url"]
for operator in r.json()["operators"]
for log in operator["logs"]
if ("usable" in log["state"]
and datetime.fromisoformat(log["temporal_interval"]["start_inclusive"]) <= now
and datetime.fromisoformat(log["temporal_interval"]["end_exclusive"]) > now)
)
except:
logger.exception("Error in log server list")
continue
if not servers:
raise ValueError("All log server lists failed")
return servers
def decode_cert(leaf: bytes) -> x509.Certificate:
# MerkleTreeLeaf for timestamped entry containing an x509 certificate:
#
# +------+-----------------------+
# | Byte | |
# +------+-----------------------+
# | 0 | Version |
# +------+-----------------------+
# | 1 | Leaf type |
# +------+-----------------------+
# | 2 | |
# | 3 | |
# | 4 | |
# | 5 | Timestamp |
# | 6 | |
# | 7 | |
# | 8 | |
# | 9 | |
# +------+-----------------------+
# | 10 | Entry type |
# | 11 | |
# +------+-----------------------+
# | 12 | |
# | 13 | Cert length (n) |
# | 14 | |
# +------+-----------------------+
# | 15 | |
# | .. | x509 DER cert |
# | n | |
# +------+-----------------------+
# | n+1 | CT extensions |
# | .. | |
# +------+-----------------------+
#
# https://www.rfc-editor.org/rfc/rfc6962.html#section-3.4
# https://www.rfc-editor.org/rfc/rfc5246.html#section-4
# RFC 6962 only defines version 1 (0x00) of the merkle tree leaf and
# a single leaf type: timestamped entry (0x00).
if (version := leaf[0]) != 0:
raise ValueError(f"Unknown version {version}")
if (leaf_type := leaf[1]) != 0:
raise ValueError(f"Unknown leaf type {leaf_type}")
if leaf[10:12] != b"\x00\x00":
# Timestamped entry type 0x0000 designates a x509 certificate. Type
# 0x001 is a precert, which we can not use, and therefore ignore.
raise TypeError("Not x509 entry")
cert_length = int.from_bytes(leaf[12:15], "big")
cert_bytes = leaf[15 : 15 + cert_length]
cert = x509.load_der_x509_certificate(cert_bytes)
return cert
def forever(f):
@wraps(f)
async def wrapper(*args, **kwargs):
while True:
try:
await f(*args, **kwargs)
except Exception:
logger.exception("Retrying")
await asyncio.sleep(30)
except:
break
return wrapper
class Watcher:
page_size = 100
def __init__(self, server: str, queue: asyncio.Queue) -> None:
self.server = server
self.queue = queue
self.log = logger.bind(server=server)
self.tree_size = 0
self.tree_watcher = asyncio.create_task(self.watch_tree_size())
self.start = 0
self.end = 0
@forever
async def watch_tree_size(self) -> None:
r = await client.get(f"{self.server}ct/v1/get-sth")
self.tree_size = r.json()["tree_size"]
self.log.debug("Tree size", size=self.tree_size)
await asyncio.sleep(600)
@forever
async def watcher(self) -> None:
index = random.randrange(self.start, self.tree_size - self.page_size)
r = await client.get(f"{self.server}ct/v1/get-entries", params={"start": index, "end": index + self.page_size,},)
entries = r.json()["entries"]
now = datetime.now(tz=UTC)
expired = 0
for entry in entries:
leaf = base64.b64decode(entry["leaf_input"])
try:
cert = decode_cert(leaf)
except TypeError:
# Ignore precerts
continue
if cert.not_valid_before_utc > now:
continue
if cert.not_valid_after_utc < now:
expired += 1
continue
await self.queue.put(cert)
# All expired: move up
if len(entries) == expired > 5:
self.start = index
q = asyncio.Queue(maxsize=100)
async def asd():
while True:
# await asyncio.sleep(10)
cert = await q.get()
print(cert)
asyncio.run(main(q))

View file

@ -1,12 +1,11 @@
import asyncio
import json
import math
import os
import random
import websockets
from selenium import webdriver
from selenium.common.exceptions import InvalidSessionIdException
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.remote.webelement import WebElement
service = webdriver.FirefoxService(
# Selenium only checks /usr/bin/geckodriver by default
@ -24,40 +23,29 @@ driver = webdriver.Firefox(service=service, options=options)
driver.set_page_load_timeout(3)
async def ct_stream(domains: asyncio.Queue) -> None:
"""Watch Certificate Transparency (CT) logs for new certificates."""
while True:
async def surf(url: str) -> None:
"""Surf around URL for a bit."""
for i in range(math.ceil(random.expovariate(0.5))):
print("🏄" if i == 0 else "🔗", url)
try:
async with websockets.connect("wss://certstream.calidog.io") as websocket:
async for message_data in websocket:
ct_handler(message_data, domains)
except (KeyboardInterrupt, asyncio.CancelledError):
return
except Exception as e:
await asyncio.to_thread(driver.get, url)
# Find all links on page. This is *much* faster than find_elements("a") + get_attribute("href")
links = await asyncio.to_thread(
driver.execute_script,
"return [...document.links].filter(a => !!a.host && a.href != location.href && !a.href.includes('#')).map(a => a.href);",
)
except InvalidSessionIdException:
# Browser closed: no way to recover
raise
except WebDriverException as e:
print(e)
def ct_handler(data: websockets.Data, domains: asyncio.Queue) -> None:
"""Save certificate's domain to queue if needed."""
# There are A LOT of certificates coming through the transparency logs;
# immediately bail without spending time decoding the message if we have
# enough domains queued up already.
if domains.full():
return
message = json.loads(data)
if message["message_type"] != "certificate_update":
return
# Certificates can verify multiple domains: We arbitrarily select the first
# non-wildcard one since we cannot connect to such host in the browser.
cert_domains = message["data"]["leaf_cert"]["all_domains"]
try:
cert_domain = next(d for d in cert_domains if "*" not in d)
except StopIteration:
return
domains.put_nowait(cert_domain)
print(type(e))
# Timeout, network error, JavaScript failure etc.
break
try:
url = random.choice(links)
except IndexError:
break
async def surfer() -> None:
@ -65,13 +53,10 @@ async def surfer() -> None:
domains = asyncio.Queue(maxsize=50)
ct_stream_task = asyncio.create_task(ct_stream(domains))
while True:
domain = await domains.get()
url = f"https://{domain}"
print("🏄", url)
try:
await asyncio.to_thread(driver.get, url)
except WebDriverException:
pass
domain = await domains.get()
url = f"https://{domain}"
await surf(url)
except (KeyboardInterrupt, asyncio.CancelledError):
break
ct_stream_task.cancel()

71
autosurfer/test.py Normal file

File diff suppressed because one or more lines are too long

View file

@ -45,8 +45,10 @@
})
pkgs.geckodriver
(pkgs.python3.withPackages (ps: [
ps.cryptography
ps.httpx
ps.selenium
ps.websockets
ps.structlog
]))
# pkgs.bashInteractive
# pkgs.coreutils