Contributing a new crawler

This document is a protocol. Follow it end-to-end and you will produce a working crawler that integrates with the IJF pipeline. Crawl is the companion reference document — it explains the conceptual foundations (trees, algorithm, full API) behind the steps prescribed here. Read it alongside this protocol for the why behind each requirement.

Background

Key terms

Term

Meaning

db

The target database: lob, rep, don, cha, apt, pro

s

The jurisdiction: fd, bc, ab, sk, mb, on, qc, nb, ns, pe, nl, yt, nt, nu, vi, va, f3, fp, ft

rid

A unique identifier for one record. Every page stored by the crawler is assigned to a record via its rid.

rdate

The date associated with a record.

Valid values for db and s are defined as StrEnum s in src/pipeline/types.py.

The tree model

The crawl engine maintains a sitemap tree (pages as nodes, HTTP requests as edges) and maps it onto a record tree where every node carries an rid and rdate. Your crawler’s job is to traverse the sitemap and assign that metadata along the way. Full explanation — with diagrams and a worked example — is in the Trees section of Crawl.

Directory layout

Create a Python package at:

src/pipeline/crawl/crawlers/<db>/<s>/__init__.py

The db and s folder names must match the lowercase enum member names in types.py exactly. The apt-legacy directory is a historical exception; all new crawlers use the plain db name.

Your package must export three functions (see The three required functions). Everything else is optional:

crawlers/
  lob/
    fd/
      __init__.py        ← required: seed, sections, parse
      adv.json           ← optional: HTTP request template
      rcnt.json
  pro/
    bc/
      __init__.py
      scheduler.py       ← optional: custom scheduler subclass
      utils.py

Imports

from datetime import datetime

from pipeline.crawl import tree
from pipeline.crawl.scheduler import Scheduler
from pipeline.utils import http

tree holds Edge, Data, and the signal strings. http.Request is the HTTP request object. Scheduler is passed into seed() at runtime.

The three required functions

Every __init__.py must define seed, sections, and parse. The engine calls them in that order for each crawl iteration.

seed

def seed(scheduler: Scheduler) -> tree.Edge | list[tree.Edge] | None:
    ...

Role: produce the starting HTTP request(s) for one iteration of the crawl.

  • Called once per crawl iteration (once the input queue drains).

  • Return an Edge or list of Edge s to start the next batch.

  • Return None to signal exhaustion — the crawl ends.

  • Use scheduler to decide what to request next (which page, which date range).

Key scheduler attributes available inside seed:

Attribute

Type

Meaning

scheduler.runtime

Runtime

HIST, DATE, or IDX — see Scheduler and runtime modes

scheduler.seeds

int

Number of times seed() has been called so far

scheduler.page_number

int

Current page number (derived from highest index seen)

scheduler.indexer.page_start

int

Index of the first record on the current page

scheduler.indexer.max_idx

int

Highest record index seen

scheduler.calendar.from_date

datetime

Start of the requested date range

scheduler.calendar.to_date

datetime

End of the requested date range

Minimal example — paginated GET:

def seed(scheduler: Scheduler) -> tree.Edge | None:
    pn = scheduler.page_number
    if pn > MAX_PAGES:
        return None
    return tree.Edge(
        label="results",
        req=http.Request(method="GET", url=f"{BASE_URL}?page={pn}"),
        p_rid="",
        p_rdate=datetime.utcfromtimestamp(0),
    )

Returning None from seed immediately ends the crawl.

sections

def sections(data: tree.Data) -> list[tree.Data]:
    ...

Role: split one crawled page into individually-parseable chunks.

Data is a NamedTuple(label: str, data: str). Unpack it as:

label, text = data
  • If the page contains multiple records (e.g. a search-results list), split it into one Data per record and return that list.

  • If the page describes only one record (e.g. a detail page), return [data] unchanged.

  • Use label to route: different page types need different splitting logic.

Example — split HTML table rows:

def sections(data: tree.Data) -> list[tree.Data]:
    label, text = data
    match label:
        case "results":
            rows = etree.HTML(text).xpath("//tr[@class='record-row']")
            return [tree.Data("row", etree.tostring(r).decode()) for r in rows]
        case _:
            return [data]

parse

def parse(
    data: tree.Data, p_rid: str, p_rdate: datetime
) -> tuple[str, datetime, list[tree.Edge]]:
    ...

Role: extract the rid, rdate, and child edges from one section.

Return a 3-tuple (rid, rdate, edges):

Return value

Meaning

rid: str

This record’s identifier (unhashed plain string). See signals below.

rdate: datetime

The date of this record.

edges: list[Edge]

Child HTTP requests to follow from this node.

Inheriting the parent. If this node is not itself a record holder (e.g. a detail page that belongs to an rid already extracted from the results page), pass the parent values through:

return p_rid, p_rdate, []

Signals. Return one of two special strings as rid to control processing:

rid value

Effect

"__SKIP"

Node is not saved to storage, but its child edges are followed. Use for intermediate pages that carry no record data but link to pages that do.

"__STOP"

Node is not saved and its child edges are not followed. Use to discard known-bad or irrelevant nodes.

Example:

def parse(
    data: tree.Data, p_rid: str, p_rdate: datetime
) -> tuple[str, datetime, list[tree.Edge]]:
    label, text = data
    match label:
        case "row":
            root = etree.HTML(text)
            rid = root.xpath("//td[@class='record-id']/text()")[0].strip()
            rdate = datetime.fromisoformat(root.xpath("//td[@class='date']/text()")[0])
            href = root.xpath("//a/@href")[0]
            return (
                rid,
                rdate,
                [tree.Edge(
                    label="detail",
                    req=http.Request(method="GET", url=urljoin(BASE_URL, href)),
                    p_rid=rid,
                    p_rdate=rdate,
                )],
            )
        case "detail":
            return p_rid, p_rdate, []

The Edge type

An Edge connects a parent node to a child via an HTTP Request. The label you assign is what data.label will be inside sections() and parse() when that response is processed — labels are your routing key. Full API for both Edge and Request is in the Edge and Request section of Crawl.

Optional module-level attributes

Define these at the top of __init__.py to configure the scheduler:

Name

Type

Default

Effect

PAGE_SIZE

int

50

Page size used by the indexer for page_number arithmetic and end-of-page detection

EARLIEST_DATE

datetime

1970-01-01 UTC

Lower bound for scheduler.calendar.from_date in historical mode

STATEMENT

str

""

SQL query executed before the crawl starts; result available as scheduler.querier.data

Example:

from datetime import datetime

PAGE_SIZE = 25
EARLIEST_DATE = datetime(2015, 3, 1)

STATEMENT is only supported when db is cha or pro — the scheduler only constructs a Querier for those databases.

Scheduler and runtime modes

The Scheduler is passed to seed() and tracks the crawl’s progress. Its runtime property tells you which mode the user invoked:

scheduler.runtime

CLI flag

Meaning

Runtime.HIST

-H

Retrieve all records ever, starting from EARLIEST_DATE

Runtime.DATE

-f DATE [-t DATE]

Retrieve records within a date range (default mode)

Runtime.IDX

-c N / -s N

Retrieve records between explicit index bounds

Import Runtime to branch on it:

from pipeline.crawl.scheduler.runtime import Runtime

def seed(scheduler):
    if scheduler.runtime == Runtime.HIST:
        # use scheduler.indexer.page_start for offset-based pagination
        ...
    else:
        # use scheduler.calendar.from_date / to_date
        ...

DATE is the default for normal runs (yesterday → today). Most crawlers only need to handle DATE and HIST; IDX is rare.

CLI usage

Once your module is in place, run it with:

# Dated mode — records updated in the last day (default)
pipe <db> <s> crawl

# Dated mode — specific range
pipe <db> <s> crawl -f 2024-01-01 -t 2024-06-30

# Historical — everything from EARLIEST_DATE to now
pipe <db> <s> crawl -H

Useful flags during development:

Flag

Effect

-d

Debug mode: verbose logging, single worker

-n N

Throttle to N concurrent requests

-w SECONDS

Wait between requests

--session-cap N

Limit total requests per session

Full example for a new crawler in debug mode:

pipe -d lob on crawl -f 2024-01-01

Advanced topics

HTTP sessions and cookies

The engine manages aiohttp.ClientSession internally, with automatic cookie handling and four throttle controls (-n, -ns, --session-cap, -w). See Advanced in Crawl for full session and throttling documentation.

Database pre-query (STATEMENT)

If your crawler needs to know what records already exist in the database before crawling (e.g. to skip already-ingested records), define STATEMENT as a SQL query. The result rows are available as scheduler.querier.data inside seed(). This is currently supported only for db = cha or db = pro.

STATEMENT = "select rid from pro_bc_seen;"

def seed(scheduler):
    seen = {row.rid for row in scheduler.querier.data}
    ...

Custom scheduler

For complex multi-phase crawls (e.g. crawling tenders then awards), you can write a custom scheduler singleton alongside your module. See src/pipeline/crawl/crawlers/pro/bc/scheduler.py as an example.

Signals in depth

See SKIP and STOP in Crawl for a full explanation with a real crawler example. In brief: "__SKIP" discards the node but follows its edges; "__STOP" discards both.

Worked example — tst/w3

The test crawler at src/pipeline/crawl/crawlers/tst/w3/__init__.py is the canonical minimal implementation. It crawls the W3C blog (10 pages, paginated GET).

BASE_URL = "https://www.w3.org/blog/page/"
PAGE_SIZE = 10


def seed(scheduler: Scheduler) -> tree.Edge | None:
    pn = scheduler.page_number      # starts at 1; advances as records are indexed
    if pn > 10:
        return None                 # exhausted — end the crawl
    return tree.Edge(
        label="blogs",              # this label flows into sections() and parse()
        req=http.Request(method="GET", url=urljoin(BASE_URL, str(pn))),
        p_rid="",
        p_rdate=datetime.utcfromtimestamp(0),
    )


def sections(data: tree.Data) -> list[tree.Data]:
    label, text = data
    match label:
        case "blogs":
            # split the results page into one Data per article
            return [
                tree.Data("blog_hed", etree.tostring(elem))
                for elem in etree.HTML(text).xpath("//article")
            ]
        case _:
            return [data]           # detail pages pass through unchanged


def parse(
    data: tree.Data, p_rid: str, p_rdate: datetime
) -> tuple[str, datetime, list[tree.Edge]]:
    label, text = data
    match label:
        case "blog_hed":
            root = etree.HTML(text)
            rid = root.xpath("//article/@id")[0]
            rdate = datetime.fromisoformat(root.xpath("//time/@datetime")[0])
            href = root.xpath("//h2/a[contains(@href, 'w3.org/blog/')]/@href")[0]
            return (
                rid,
                rdate,
                [tree.Edge(
                    label="blog",
                    req=http.Request(method="GET", url=href),
                    p_rid=rid,
                    p_rdate=rdate,
                )],
            )
        case _:
            return p_rid, p_rdate, []  # detail page: inherit parent's rid/rdate

Trace through one iteration

  1. seed() returns an Edge(label="blogs", ...) pointing to page 1 of the blog listing.

  2. The engine fetches that URL and calls sections(Data("blogs", <html>)), which returns a list of Data("blog_hed", <article html>) — one per article on the page.

  3. For each section the engine calls parse(Data("blog_hed", ...), "", epoch). parse extracts rid and rdate from the article element and returns a child Edge(label="blog", ...) pointing to the article detail page.

  4. The detail page arrives as Data("blog", <html>). sections passes it through. parse returns p_rid, p_rdate, [] — inheriting the rid from step 3, no further edges.

  5. Once the input queue empties, seed() is called again. scheduler.page_number has advanced (it reflects the highest index seen), so the next page URL is returned.

  6. After page 10, pn > 10 is true and seed() returns None — crawl ends.

Contributor checklist

Before submitting:

  • Module is at src/pipeline/crawl/crawlers/<db>/<s>/__init__.py

  • db and s are valid members of DB and S in src/pipeline/types.py

  • __init__.py exports seed, sections, parse with the correct signatures

  • PAGE_SIZE is set if the site paginates

  • EARLIEST_DATE is set if the site supports historical queries

  • No hard-coded credentials — use environment variables / .env

  • Crawler runs without error in debug dated mode: pipe -d <db> <s> crawl -f <date>

  • At least one record is stored after a short run

  • Historical mode works (if applicable): pipe -d <db> <s> crawl -H