########################## Contributing a new crawler ########################## This document is a protocol. Follow it end-to-end and you will produce a working crawler that integrates with the IJF pipeline. :doc:`crawl` is the companion reference document — it explains the conceptual foundations (trees, algorithm, full API) behind the steps prescribed here. Read it alongside this protocol for the *why* behind each requirement. Background ********** Key terms --------- .. list-table:: :header-rows: 1 :widths: 10 90 * - Term - Meaning * - ``db`` - The target database: ``lob``, ``rep``, ``don``, ``cha``, ``apt``, ``pro`` * - ``s`` - The jurisdiction: ``fd``, ``bc``, ``ab``, ``sk``, ``mb``, ``on``, ``qc``, ``nb``, ``ns``, ``pe``, ``nl``, ``yt``, ``nt``, ``nu``, ``vi``, ``va``, ``f3``, ``fp``, ``ft`` * - ``rid`` - A unique identifier for one record. Every page stored by the crawler is assigned to a record via its ``rid``. * - ``rdate`` - The date associated with a record. Valid values for ``db`` and ``s`` are defined as ``StrEnum`` s in ``src/pipeline/types.py``. The tree model -------------- The crawl engine maintains a *sitemap tree* (pages as nodes, HTTP requests as edges) and maps it onto a *record tree* where every node carries an ``rid`` and ``rdate``. Your crawler's job is to traverse the sitemap and assign that metadata along the way. Full explanation — with diagrams and a worked example — is in the :ref:`Trees ` section of :doc:`crawl`. Directory layout **************** Create a Python package at: .. code-block:: text src/pipeline/crawl/crawlers///__init__.py The ``db`` and ``s`` folder names must match the lowercase enum member names in ``types.py`` exactly. The ``apt-legacy`` directory is a historical exception; all new crawlers use the plain ``db`` name. Your package must export three functions (see `The three required functions`_). Everything else is optional: .. code-block:: text crawlers/ lob/ fd/ __init__.py ← required: seed, sections, parse adv.json ← optional: HTTP request template rcnt.json pro/ bc/ __init__.py scheduler.py ← optional: custom scheduler subclass utils.py Imports ******* .. code-block:: python from datetime import datetime from pipeline.crawl import tree from pipeline.crawl.scheduler import Scheduler from pipeline.utils import http ``tree`` holds ``Edge``, ``Data``, and the signal strings. ``http.Request`` is the HTTP request object. ``Scheduler`` is passed into ``seed()`` at runtime. The three required functions **************************** Every ``__init__.py`` must define ``seed``, ``sections``, and ``parse``. The engine calls them in that order for each crawl iteration. ``seed`` -------- .. code-block:: python def seed(scheduler: Scheduler) -> tree.Edge | list[tree.Edge] | None: ... **Role:** produce the starting HTTP request(s) for one iteration of the crawl. - Called once per crawl iteration (once the input queue drains). - Return an ``Edge`` or list of ``Edge`` s to start the next batch. - Return ``None`` to signal exhaustion — the crawl ends. - Use ``scheduler`` to decide what to request next (which page, which date range). Key scheduler attributes available inside ``seed``: .. list-table:: :header-rows: 1 :widths: 30 20 50 * - Attribute - Type - Meaning * - ``scheduler.runtime`` - ``Runtime`` - ``HIST``, ``DATE``, or ``IDX`` — see `Scheduler and runtime modes`_ * - ``scheduler.seeds`` - ``int`` - Number of times ``seed()`` has been called so far * - ``scheduler.page_number`` - ``int`` - Current page number (derived from highest index seen) * - ``scheduler.indexer.page_start`` - ``int`` - Index of the first record on the current page * - ``scheduler.indexer.max_idx`` - ``int`` - Highest record index seen * - ``scheduler.calendar.from_date`` - ``datetime`` - Start of the requested date range * - ``scheduler.calendar.to_date`` - ``datetime`` - End of the requested date range Minimal example — paginated GET: .. code-block:: python def seed(scheduler: Scheduler) -> tree.Edge | None: pn = scheduler.page_number if pn > MAX_PAGES: return None return tree.Edge( label="results", req=http.Request(method="GET", url=f"{BASE_URL}?page={pn}"), p_rid="", p_rdate=datetime.utcfromtimestamp(0), ) Returning ``None`` from ``seed`` immediately ends the crawl. ``sections`` ------------ .. code-block:: python def sections(data: tree.Data) -> list[tree.Data]: ... **Role:** split one crawled page into individually-parseable chunks. ``Data`` is a ``NamedTuple(label: str, data: str)``. Unpack it as: .. code-block:: python label, text = data - If the page contains multiple records (e.g. a search-results list), split it into one ``Data`` per record and return that list. - If the page describes only one record (e.g. a detail page), return ``[data]`` unchanged. - Use ``label`` to route: different page types need different splitting logic. Example — split HTML table rows: .. code-block:: python def sections(data: tree.Data) -> list[tree.Data]: label, text = data match label: case "results": rows = etree.HTML(text).xpath("//tr[@class='record-row']") return [tree.Data("row", etree.tostring(r).decode()) for r in rows] case _: return [data] ``parse`` --------- .. code-block:: python def parse( data: tree.Data, p_rid: str, p_rdate: datetime ) -> tuple[str, datetime, list[tree.Edge]]: ... **Role:** extract the ``rid``, ``rdate``, and child edges from one section. Return a 3-tuple ``(rid, rdate, edges)``: .. list-table:: :header-rows: 1 :widths: 20 80 * - Return value - Meaning * - ``rid: str`` - This record's identifier (unhashed plain string). See signals below. * - ``rdate: datetime`` - The date of this record. * - ``edges: list[Edge]`` - Child HTTP requests to follow from this node. **Inheriting the parent.** If this node is not itself a record holder (e.g. a detail page that belongs to an ``rid`` already extracted from the results page), pass the parent values through: .. code-block:: python return p_rid, p_rdate, [] **Signals.** Return one of two special strings as ``rid`` to control processing: .. list-table:: :header-rows: 1 :widths: 15 85 * - ``rid`` value - Effect * - ``"__SKIP"`` - Node is **not** saved to storage, but its child edges **are** followed. Use for intermediate pages that carry no record data but link to pages that do. * - ``"__STOP"`` - Node is **not** saved and its child edges are **not** followed. Use to discard known-bad or irrelevant nodes. Example: .. code-block:: python def parse( data: tree.Data, p_rid: str, p_rdate: datetime ) -> tuple[str, datetime, list[tree.Edge]]: label, text = data match label: case "row": root = etree.HTML(text) rid = root.xpath("//td[@class='record-id']/text()")[0].strip() rdate = datetime.fromisoformat(root.xpath("//td[@class='date']/text()")[0]) href = root.xpath("//a/@href")[0] return ( rid, rdate, [tree.Edge( label="detail", req=http.Request(method="GET", url=urljoin(BASE_URL, href)), p_rid=rid, p_rdate=rdate, )], ) case "detail": return p_rid, p_rdate, [] The ``Edge`` type ***************** An ``Edge`` connects a parent node to a child via an HTTP ``Request``. The ``label`` you assign is what ``data.label`` will be inside ``sections()`` and ``parse()`` when that response is processed — labels are your routing key. Full API for both ``Edge`` and ``Request`` is in the :ref:`Edge and Request ` section of :doc:`crawl`. Optional module-level attributes ******************************** Define these at the top of ``__init__.py`` to configure the scheduler: .. list-table:: :header-rows: 1 :widths: 20 10 15 55 * - Name - Type - Default - Effect * - ``PAGE_SIZE`` - ``int`` - ``50`` - Page size used by the indexer for ``page_number`` arithmetic and end-of-page detection * - ``EARLIEST_DATE`` - ``datetime`` - ``1970-01-01 UTC`` - Lower bound for ``scheduler.calendar.from_date`` in historical mode * - ``STATEMENT`` - ``str`` - ``""`` - SQL query executed before the crawl starts; result available as ``scheduler.querier.data`` Example: .. code-block:: python from datetime import datetime PAGE_SIZE = 25 EARLIEST_DATE = datetime(2015, 3, 1) ``STATEMENT`` is only supported when ``db`` is ``cha`` or ``pro`` — the scheduler only constructs a ``Querier`` for those databases. Scheduler and runtime modes *************************** The ``Scheduler`` is passed to ``seed()`` and tracks the crawl's progress. Its ``runtime`` property tells you which mode the user invoked: .. list-table:: :header-rows: 1 :widths: 25 20 55 * - ``scheduler.runtime`` - CLI flag - Meaning * - ``Runtime.HIST`` - ``-H`` - Retrieve all records ever, starting from ``EARLIEST_DATE`` * - ``Runtime.DATE`` - ``-f DATE [-t DATE]`` - Retrieve records within a date range (default mode) * - ``Runtime.IDX`` - ``-c N`` / ``-s N`` - Retrieve records between explicit index bounds Import ``Runtime`` to branch on it: .. code-block:: python from pipeline.crawl.scheduler.runtime import Runtime def seed(scheduler): if scheduler.runtime == Runtime.HIST: # use scheduler.indexer.page_start for offset-based pagination ... else: # use scheduler.calendar.from_date / to_date ... ``DATE`` is the default for normal runs (yesterday → today). Most crawlers only need to handle ``DATE`` and ``HIST``; ``IDX`` is rare. CLI usage ********* Once your module is in place, run it with: .. code-block:: bash # Dated mode — records updated in the last day (default) pipe crawl # Dated mode — specific range pipe crawl -f 2024-01-01 -t 2024-06-30 # Historical — everything from EARLIEST_DATE to now pipe crawl -H Useful flags during development: .. list-table:: :header-rows: 1 :widths: 25 75 * - Flag - Effect * - ``-d`` - Debug mode: verbose logging, single worker * - ``-n N`` - Throttle to N concurrent requests * - ``-w SECONDS`` - Wait between requests * - ``--session-cap N`` - Limit total requests per session Full example for a new crawler in debug mode: .. code-block:: bash pipe -d lob on crawl -f 2024-01-01 Advanced topics *************** HTTP sessions and cookies ------------------------- The engine manages ``aiohttp.ClientSession`` internally, with automatic cookie handling and four throttle controls (``-n``, ``-ns``, ``--session-cap``, ``-w``). See :ref:`Advanced ` in :doc:`crawl` for full session and throttling documentation. Database pre-query (``STATEMENT``) ----------------------------------- If your crawler needs to know what records already exist in the database before crawling (e.g. to skip already-ingested records), define ``STATEMENT`` as a SQL query. The result rows are available as ``scheduler.querier.data`` inside ``seed()``. This is currently supported only for ``db = cha`` or ``db = pro``. .. code-block:: python STATEMENT = "select rid from pro_bc_seen;" def seed(scheduler): seen = {row.rid for row in scheduler.querier.data} ... Custom scheduler ---------------- For complex multi-phase crawls (e.g. crawling tenders then awards), you can write a custom scheduler singleton alongside your module. See ``src/pipeline/crawl/crawlers/pro/bc/scheduler.py`` as an example. Signals in depth ---------------- See :ref:`SKIP and STOP ` in :doc:`crawl` for a full explanation with a real crawler example. In brief: ``"__SKIP"`` discards the node but follows its edges; ``"__STOP"`` discards both. Worked example — ``tst/w3`` *************************** The test crawler at ``src/pipeline/crawl/crawlers/tst/w3/__init__.py`` is the canonical minimal implementation. It crawls the W3C blog (10 pages, paginated GET). .. code-block:: python BASE_URL = "https://www.w3.org/blog/page/" PAGE_SIZE = 10 def seed(scheduler: Scheduler) -> tree.Edge | None: pn = scheduler.page_number # starts at 1; advances as records are indexed if pn > 10: return None # exhausted — end the crawl return tree.Edge( label="blogs", # this label flows into sections() and parse() req=http.Request(method="GET", url=urljoin(BASE_URL, str(pn))), p_rid="", p_rdate=datetime.utcfromtimestamp(0), ) def sections(data: tree.Data) -> list[tree.Data]: label, text = data match label: case "blogs": # split the results page into one Data per article return [ tree.Data("blog_hed", etree.tostring(elem)) for elem in etree.HTML(text).xpath("//article") ] case _: return [data] # detail pages pass through unchanged def parse( data: tree.Data, p_rid: str, p_rdate: datetime ) -> tuple[str, datetime, list[tree.Edge]]: label, text = data match label: case "blog_hed": root = etree.HTML(text) rid = root.xpath("//article/@id")[0] rdate = datetime.fromisoformat(root.xpath("//time/@datetime")[0]) href = root.xpath("//h2/a[contains(@href, 'w3.org/blog/')]/@href")[0] return ( rid, rdate, [tree.Edge( label="blog", req=http.Request(method="GET", url=href), p_rid=rid, p_rdate=rdate, )], ) case _: return p_rid, p_rdate, [] # detail page: inherit parent's rid/rdate Trace through one iteration --------------------------- 1. ``seed()`` returns an ``Edge(label="blogs", ...)`` pointing to page 1 of the blog listing. 2. The engine fetches that URL and calls ``sections(Data("blogs", ))``, which returns a list of ``Data("blog_hed",
)`` — one per article on the page. 3. For each section the engine calls ``parse(Data("blog_hed", ...), "", epoch)``. ``parse`` extracts ``rid`` and ``rdate`` from the article element and returns a child ``Edge(label="blog", ...)`` pointing to the article detail page. 4. The detail page arrives as ``Data("blog", )``. ``sections`` passes it through. ``parse`` returns ``p_rid, p_rdate, []`` — inheriting the rid from step 3, no further edges. 5. Once the input queue empties, ``seed()`` is called again. ``scheduler.page_number`` has advanced (it reflects the highest index seen), so the next page URL is returned. 6. After page 10, ``pn > 10`` is true and ``seed()`` returns ``None`` — crawl ends. Contributor checklist ********************* Before submitting: - Module is at ``src/pipeline/crawl/crawlers///__init__.py`` - ``db`` and ``s`` are valid members of ``DB`` and ``S`` in ``src/pipeline/types.py`` - ``__init__.py`` exports ``seed``, ``sections``, ``parse`` with the correct signatures - ``PAGE_SIZE`` is set if the site paginates - ``EARLIEST_DATE`` is set if the site supports historical queries - No hard-coded credentials — use environment variables / ``.env`` - Crawler runs without error in debug dated mode: ``pipe -d crawl -f `` - At least one record is stored after a short run - Historical mode works (if applicable): ``pipe -d crawl -H``