dhuser's picture
Initial LREC LLM-as-Annotator app
a918698
"""AnnotationSchema: presets, JSON Schema builder, output validator.
A single internal representation handles POS, NER, lemmatization, and any
user-defined per-token task. See README for the contract.
"""
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass, field, asdict
from typing import Any, Optional
from jsonschema import Draft202012Validator
from paths import TAGSETS_DIR, TUTORIAL_SCHEMAS_DIR, LANGUAGES, read_text
AGGREGATORS = ["vote", "lcs", "min", "max", "priority", "vote_per_subfield"]
FIELD_TYPES = ["string", "enum", "object"]
@dataclass
class Subfield:
name: str
values: Optional[list[str]] = None # None => free string
nullable: bool = True
@dataclass
class Field:
name: str
type: str # "string" | "enum" | "object"
values: list[str] = field(default_factory=list) # for enum
nullable: bool = True
aggregator: str = "vote"
subfields: list[Subfield] = field(default_factory=list) # for object
@dataclass
class AnnotationSchema:
task_name: str
fields: list[Field]
language: str = ""
description: str = ""
def to_dict(self) -> dict:
return {
"task_name": self.task_name,
"language": self.language,
"description": self.description,
"fields": [_field_to_dict(f) for f in self.fields],
}
def hash(self) -> str:
payload = json.dumps(self.to_dict(), sort_keys=True, ensure_ascii=False)
return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:10]
def _field_to_dict(f: Field) -> dict:
d = {"name": f.name, "type": f.type, "nullable": f.nullable, "aggregator": f.aggregator}
if f.type == "enum":
d["values"] = f.values
if f.type == "object":
d["subfields"] = [asdict(s) for s in f.subfields]
return d
def _field_from_dict(d: dict) -> Field:
subs = [Subfield(**s) for s in d.get("subfields", [])]
return Field(
name=d["name"],
type=d["type"],
values=d.get("values", []),
nullable=d.get("nullable", True),
aggregator=d.get("aggregator", "vote"),
subfields=subs,
)
def schema_from_dict(d: dict) -> AnnotationSchema:
return AnnotationSchema(
task_name=d.get("task_name", "custom"),
language=d.get("language", ""),
description=d.get("description", ""),
fields=[_field_from_dict(f) for f in d.get("fields", [])],
)
# ---------------------------------------------------------------------------
# JSON Schema emission (Draft 2020-12) — matches pos_lemma_morph_schema.json
# ---------------------------------------------------------------------------
def to_json_schema(s: AnnotationSchema) -> dict:
token_props: dict[str, Any] = {"surface": {"type": "string"}}
required = ["surface"]
for f in s.fields:
token_props[f.name] = _field_to_json(f)
required.append(f.name)
return {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": f"{s.task_name} annotation",
"type": "object",
"required": ["sentence_id", "language", "tokens"],
"additionalProperties": False,
"properties": {
"sentence_id": {"type": "string"},
"language": {"type": "string"},
"tokens": {
"type": "array",
"items": {
"type": "object",
"required": required,
"additionalProperties": False,
"properties": token_props,
},
},
},
}
def _field_to_json(f: Field) -> dict:
if f.type == "enum":
out: dict[str, Any] = {"type": ["string", "null"] if f.nullable else "string", "enum": list(f.values) + ([None] if f.nullable else [])}
return out
if f.type == "object":
props = {}
for sub in f.subfields:
t = ["string", "null"] if sub.nullable else "string"
if sub.values:
props[sub.name] = {"type": t, "enum": list(sub.values) + ([None] if sub.nullable else [])}
else:
props[sub.name] = {"type": t}
return {"type": "object", "additionalProperties": False, "properties": props}
# string
return {"type": ["string", "null"] if f.nullable else "string"}
def validate(s: AnnotationSchema, payload: dict) -> tuple[bool, list[str]]:
schema = to_json_schema(s)
validator = Draft202012Validator(schema)
errors = [f"{'/'.join(str(p) for p in e.path)}: {e.message}" for e in validator.iter_errors(payload)]
return (len(errors) == 0, errors)
# ---------------------------------------------------------------------------
# Presets
# ---------------------------------------------------------------------------
def _parse_tagset(lang_code: str) -> list[str]:
path = TAGSETS_DIR / f"{lang_code}_tagset.txt"
tags = []
for line in read_text(path).splitlines():
line = line.strip()
if not line:
continue
# Lines look like: "TAG Description" or "TAG\tDescription"
tag = line.split(None, 1)[0]
tags.append(tag)
return tags
def _ud_preset() -> AnnotationSchema:
"""Read the tutorial's UD schema and convert to AnnotationSchema."""
raw = json.loads(read_text(TUTORIAL_SCHEMAS_DIR / "pos_lemma_morph_schema.json"))
token_props = raw["properties"]["tokens"]["items"]["properties"]
upos_values = token_props["upos"]["enum"]
feature_subs = []
for name, spec in token_props["features"]["properties"].items():
feature_subs.append(Subfield(name=name, values=None, nullable=True))
return AnnotationSchema(
task_name="UD POS + lemma + morphology",
description="Universal Dependencies UPOS + morphological features.",
fields=[
Field(name="lemma", type="string", nullable=True, aggregator="lcs"),
Field(name="upos", type="enum", values=upos_values, nullable=False, aggregator="vote"),
Field(name="features", type="object", subfields=feature_subs, aggregator="vote_per_subfield"),
Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"),
Field(name="comment", type="string", nullable=True, aggregator="priority"),
],
)
def _tagset_preset(lang_code: str) -> AnnotationSchema:
tags = _parse_tagset(lang_code)
return AnnotationSchema(
task_name=f"{lang_code} POS + lemma (bespoke tagset)",
language=LANGUAGES.get(lang_code, lang_code),
description=f"Bespoke compound POS tags for {LANGUAGES.get(lang_code, lang_code)}.",
fields=[
Field(name="lemma", type="string", nullable=True, aggregator="lcs"),
Field(name="pos", type="enum", values=tags, nullable=False, aggregator="vote"),
Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"),
Field(name="comment", type="string", nullable=True, aggregator="priority"),
],
)
def _ner_preset() -> AnnotationSchema:
return AnnotationSchema(
task_name="Named Entity Recognition (BIO)",
description="Per-token BIO tags for PER / LOC / ORG / MISC.",
fields=[
Field(
name="ner",
type="enum",
values=["O", "B-PER", "I-PER", "B-LOC", "I-LOC", "B-ORG", "I-ORG", "B-MISC", "I-MISC"],
nullable=False,
aggregator="vote",
),
Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"),
Field(name="comment", type="string", nullable=True, aggregator="priority"),
],
)
def _lemma_only_preset() -> AnnotationSchema:
return AnnotationSchema(
task_name="Lemmatization only",
description="One lemma per token.",
fields=[
Field(name="lemma", type="string", nullable=True, aggregator="lcs"),
Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"),
],
)
def _custom_preset() -> AnnotationSchema:
return AnnotationSchema(
task_name="Custom task",
description="Define your own fields in the Schema tab.",
fields=[
Field(name="label", type="enum", values=["LABEL_A", "LABEL_B"], nullable=False, aggregator="vote"),
Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"),
],
)
PRESETS = {
"ud_upos_morph": ("UD UPOS + morphology (UD-standard)", _ud_preset),
"grc_tagset": ("Ancient Greek — bespoke tagset (GRC)", lambda: _tagset_preset("GRC")),
"hye_tagset": ("Old Armenian — bespoke tagset (HYE)", lambda: _tagset_preset("HYE")),
"kat_tagset": ("Old Georgian — bespoke tagset (KAT)", lambda: _tagset_preset("KAT")),
"syc_tagset": ("Syriac — bespoke tagset (SYC)", lambda: _tagset_preset("SYC")),
"ner_basic": ("Named Entity Recognition (BIO)", _ner_preset),
"lemma_only": ("Lemmatization only", _lemma_only_preset),
"custom": ("Custom — define your own", _custom_preset),
}
def list_presets() -> list[tuple[str, str]]:
"""Return [(preset_key, display_label), ...]."""
return [(k, label) for k, (label, _) in PRESETS.items()]
def from_preset(key: str) -> AnnotationSchema:
if key not in PRESETS:
raise KeyError(f"Unknown preset: {key}. Known: {list(PRESETS)}")
return PRESETS[key][1]()