Contributing a new crawler
This document is a protocol. Follow it end-to-end and you will produce a working crawler that integrates with the IJF pipeline. Crawl is the companion reference document — it explains the conceptual foundations (trees, algorithm, full API) behind the steps prescribed here. Read it alongside this protocol for the why behind each requirement.
Background
Key terms
Term |
Meaning |
|---|---|
|
The target database: |
|
The jurisdiction: |
|
A unique identifier for one record. Every page stored by the crawler is assigned to
a record via its |
|
The date associated with a record. |
Valid values for db and s are defined as StrEnum s in src/pipeline/types.py.
The tree model
The crawl engine maintains a sitemap tree (pages as nodes, HTTP requests as edges) and
maps it onto a record tree where every node carries an rid and rdate. Your
crawler’s job is to traverse the sitemap and assign that metadata along the way. Full
explanation — with diagrams and a worked example — is in the Trees section
of Crawl.
Directory layout
Create a Python package at:
src/pipeline/crawl/crawlers/<db>/<s>/__init__.py
The db and s folder names must match the lowercase enum member names in
types.py exactly. The apt-legacy directory is a historical exception; all new
crawlers use the plain db name.
Your package must export three functions (see The three required functions). Everything else is optional:
crawlers/
lob/
fd/
__init__.py ← required: seed, sections, parse
adv.json ← optional: HTTP request template
rcnt.json
pro/
bc/
__init__.py
scheduler.py ← optional: custom scheduler subclass
utils.py
Imports
from datetime import datetime
from pipeline.crawl import tree
from pipeline.crawl.scheduler import Scheduler
from pipeline.utils import http
tree holds Edge, Data, and the signal strings.
http.Request is the HTTP request object.
Scheduler is passed into seed() at runtime.
The three required functions
Every __init__.py must define seed, sections, and parse. The engine calls
them in that order for each crawl iteration.
seed
def seed(scheduler: Scheduler) -> tree.Edge | list[tree.Edge] | None:
...
Role: produce the starting HTTP request(s) for one iteration of the crawl.
Called once per crawl iteration (once the input queue drains).
Return an
Edgeor list ofEdges to start the next batch.Return
Noneto signal exhaustion — the crawl ends.Use
schedulerto decide what to request next (which page, which date range).
Key scheduler attributes available inside seed:
Attribute |
Type |
Meaning |
|---|---|---|
|
|
|
|
|
Number of times |
|
|
Current page number (derived from highest index seen) |
|
|
Index of the first record on the current page |
|
|
Highest record index seen |
|
|
Start of the requested date range |
|
|
End of the requested date range |
Minimal example — paginated GET:
def seed(scheduler: Scheduler) -> tree.Edge | None:
pn = scheduler.page_number
if pn > MAX_PAGES:
return None
return tree.Edge(
label="results",
req=http.Request(method="GET", url=f"{BASE_URL}?page={pn}"),
p_rid="",
p_rdate=datetime.utcfromtimestamp(0),
)
Returning None from seed immediately ends the crawl.
sections
def sections(data: tree.Data) -> list[tree.Data]:
...
Role: split one crawled page into individually-parseable chunks.
Data is a NamedTuple(label: str, data: str). Unpack it as:
label, text = data
If the page contains multiple records (e.g. a search-results list), split it into one
Dataper record and return that list.If the page describes only one record (e.g. a detail page), return
[data]unchanged.Use
labelto route: different page types need different splitting logic.
Example — split HTML table rows:
def sections(data: tree.Data) -> list[tree.Data]:
label, text = data
match label:
case "results":
rows = etree.HTML(text).xpath("//tr[@class='record-row']")
return [tree.Data("row", etree.tostring(r).decode()) for r in rows]
case _:
return [data]
parse
def parse(
data: tree.Data, p_rid: str, p_rdate: datetime
) -> tuple[str, datetime, list[tree.Edge]]:
...
Role: extract the rid, rdate, and child edges from one section.
Return a 3-tuple (rid, rdate, edges):
Return value |
Meaning |
|---|---|
|
This record’s identifier (unhashed plain string). See signals below. |
|
The date of this record. |
|
Child HTTP requests to follow from this node. |
Inheriting the parent. If this node is not itself a record holder (e.g. a detail page
that belongs to an rid already extracted from the results page), pass the parent
values through:
return p_rid, p_rdate, []
Signals. Return one of two special strings as rid to control processing:
|
Effect |
|---|---|
|
Node is not saved to storage, but its child edges are followed. Use for intermediate pages that carry no record data but link to pages that do. |
|
Node is not saved and its child edges are not followed. Use to discard known-bad or irrelevant nodes. |
Example:
def parse(
data: tree.Data, p_rid: str, p_rdate: datetime
) -> tuple[str, datetime, list[tree.Edge]]:
label, text = data
match label:
case "row":
root = etree.HTML(text)
rid = root.xpath("//td[@class='record-id']/text()")[0].strip()
rdate = datetime.fromisoformat(root.xpath("//td[@class='date']/text()")[0])
href = root.xpath("//a/@href")[0]
return (
rid,
rdate,
[tree.Edge(
label="detail",
req=http.Request(method="GET", url=urljoin(BASE_URL, href)),
p_rid=rid,
p_rdate=rdate,
)],
)
case "detail":
return p_rid, p_rdate, []
The Edge type
An Edge connects a parent node to a child via an HTTP Request. The label you
assign is what data.label will be inside sections() and parse() when that
response is processed — labels are your routing key. Full API for both Edge and
Request is in the Edge and Request section of Crawl.
Optional module-level attributes
Define these at the top of __init__.py to configure the scheduler:
Name |
Type |
Default |
Effect |
|---|---|---|---|
|
|
|
Page size used by the indexer for |
|
|
|
Lower bound for |
|
|
|
SQL query executed before the crawl starts; result available as
|
Example:
from datetime import datetime
PAGE_SIZE = 25
EARLIEST_DATE = datetime(2015, 3, 1)
STATEMENT is only supported when db is cha or pro — the scheduler only
constructs a Querier for those databases.
Scheduler and runtime modes
The Scheduler is passed to seed() and tracks the crawl’s progress. Its runtime
property tells you which mode the user invoked:
|
CLI flag |
Meaning |
|---|---|---|
|
|
Retrieve all records ever, starting from |
|
|
Retrieve records within a date range (default mode) |
|
|
Retrieve records between explicit index bounds |
Import Runtime to branch on it:
from pipeline.crawl.scheduler.runtime import Runtime
def seed(scheduler):
if scheduler.runtime == Runtime.HIST:
# use scheduler.indexer.page_start for offset-based pagination
...
else:
# use scheduler.calendar.from_date / to_date
...
DATE is the default for normal runs (yesterday → today). Most crawlers only need to
handle DATE and HIST; IDX is rare.
CLI usage
Once your module is in place, run it with:
# Dated mode — records updated in the last day (default)
pipe <db> <s> crawl
# Dated mode — specific range
pipe <db> <s> crawl -f 2024-01-01 -t 2024-06-30
# Historical — everything from EARLIEST_DATE to now
pipe <db> <s> crawl -H
Useful flags during development:
Flag |
Effect |
|---|---|
|
Debug mode: verbose logging, single worker |
|
Throttle to N concurrent requests |
|
Wait between requests |
|
Limit total requests per session |
Full example for a new crawler in debug mode:
pipe -d lob on crawl -f 2024-01-01
Advanced topics
Database pre-query (STATEMENT)
If your crawler needs to know what records already exist in the database before crawling
(e.g. to skip already-ingested records), define STATEMENT as a SQL query. The result
rows are available as scheduler.querier.data inside seed(). This is currently
supported only for db = cha or db = pro.
STATEMENT = "select rid from pro_bc_seen;"
def seed(scheduler):
seen = {row.rid for row in scheduler.querier.data}
...
Custom scheduler
For complex multi-phase crawls (e.g. crawling tenders then awards), you can write a custom
scheduler singleton alongside your module. See
src/pipeline/crawl/crawlers/pro/bc/scheduler.py as an example.
Signals in depth
See SKIP and STOP in Crawl for a full explanation with a real
crawler example. In brief: "__SKIP" discards the node but follows its edges;
"__STOP" discards both.
Worked example — tst/w3
The test crawler at src/pipeline/crawl/crawlers/tst/w3/__init__.py is the canonical
minimal implementation. It crawls the W3C blog (10 pages, paginated GET).
BASE_URL = "https://www.w3.org/blog/page/"
PAGE_SIZE = 10
def seed(scheduler: Scheduler) -> tree.Edge | None:
pn = scheduler.page_number # starts at 1; advances as records are indexed
if pn > 10:
return None # exhausted — end the crawl
return tree.Edge(
label="blogs", # this label flows into sections() and parse()
req=http.Request(method="GET", url=urljoin(BASE_URL, str(pn))),
p_rid="",
p_rdate=datetime.utcfromtimestamp(0),
)
def sections(data: tree.Data) -> list[tree.Data]:
label, text = data
match label:
case "blogs":
# split the results page into one Data per article
return [
tree.Data("blog_hed", etree.tostring(elem))
for elem in etree.HTML(text).xpath("//article")
]
case _:
return [data] # detail pages pass through unchanged
def parse(
data: tree.Data, p_rid: str, p_rdate: datetime
) -> tuple[str, datetime, list[tree.Edge]]:
label, text = data
match label:
case "blog_hed":
root = etree.HTML(text)
rid = root.xpath("//article/@id")[0]
rdate = datetime.fromisoformat(root.xpath("//time/@datetime")[0])
href = root.xpath("//h2/a[contains(@href, 'w3.org/blog/')]/@href")[0]
return (
rid,
rdate,
[tree.Edge(
label="blog",
req=http.Request(method="GET", url=href),
p_rid=rid,
p_rdate=rdate,
)],
)
case _:
return p_rid, p_rdate, [] # detail page: inherit parent's rid/rdate
Trace through one iteration
seed()returns anEdge(label="blogs", ...)pointing to page 1 of the blog listing.The engine fetches that URL and calls
sections(Data("blogs", <html>)), which returns a list ofData("blog_hed", <article html>)— one per article on the page.For each section the engine calls
parse(Data("blog_hed", ...), "", epoch).parseextractsridandrdatefrom the article element and returns a childEdge(label="blog", ...)pointing to the article detail page.The detail page arrives as
Data("blog", <html>).sectionspasses it through.parsereturnsp_rid, p_rdate, []— inheriting the rid from step 3, no further edges.Once the input queue empties,
seed()is called again.scheduler.page_numberhas advanced (it reflects the highest index seen), so the next page URL is returned.After page 10,
pn > 10is true andseed()returnsNone— crawl ends.
Contributor checklist
Before submitting:
Module is at
src/pipeline/crawl/crawlers/<db>/<s>/__init__.pydbandsare valid members ofDBandSinsrc/pipeline/types.py__init__.pyexportsseed,sections,parsewith the correct signaturesPAGE_SIZEis set if the site paginatesEARLIEST_DATEis set if the site supports historical queriesNo hard-coded credentials — use environment variables /
.envCrawler runs without error in debug dated mode:
pipe -d <db> <s> crawl -f <date>At least one record is stored after a short run
Historical mode works (if applicable):
pipe -d <db> <s> crawl -H