"""AnnotationSchema: presets, JSON Schema builder, output validator. A single internal representation handles POS, NER, lemmatization, and any user-defined per-token task. See README for the contract. """ from __future__ import annotations import hashlib import json from dataclasses import dataclass, field, asdict from typing import Any, Optional from jsonschema import Draft202012Validator from paths import TAGSETS_DIR, TUTORIAL_SCHEMAS_DIR, LANGUAGES, read_text AGGREGATORS = ["vote", "lcs", "min", "max", "priority", "vote_per_subfield"] FIELD_TYPES = ["string", "enum", "object"] @dataclass class Subfield: name: str values: Optional[list[str]] = None # None => free string nullable: bool = True @dataclass class Field: name: str type: str # "string" | "enum" | "object" values: list[str] = field(default_factory=list) # for enum nullable: bool = True aggregator: str = "vote" subfields: list[Subfield] = field(default_factory=list) # for object @dataclass class AnnotationSchema: task_name: str fields: list[Field] language: str = "" description: str = "" def to_dict(self) -> dict: return { "task_name": self.task_name, "language": self.language, "description": self.description, "fields": [_field_to_dict(f) for f in self.fields], } def hash(self) -> str: payload = json.dumps(self.to_dict(), sort_keys=True, ensure_ascii=False) return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:10] def _field_to_dict(f: Field) -> dict: d = {"name": f.name, "type": f.type, "nullable": f.nullable, "aggregator": f.aggregator} if f.type == "enum": d["values"] = f.values if f.type == "object": d["subfields"] = [asdict(s) for s in f.subfields] return d def _field_from_dict(d: dict) -> Field: subs = [Subfield(**s) for s in d.get("subfields", [])] return Field( name=d["name"], type=d["type"], values=d.get("values", []), nullable=d.get("nullable", True), aggregator=d.get("aggregator", "vote"), subfields=subs, ) def schema_from_dict(d: dict) -> AnnotationSchema: return AnnotationSchema( task_name=d.get("task_name", "custom"), language=d.get("language", ""), description=d.get("description", ""), fields=[_field_from_dict(f) for f in d.get("fields", [])], ) # --------------------------------------------------------------------------- # JSON Schema emission (Draft 2020-12) — matches pos_lemma_morph_schema.json # --------------------------------------------------------------------------- def to_json_schema(s: AnnotationSchema) -> dict: token_props: dict[str, Any] = {"surface": {"type": "string"}} required = ["surface"] for f in s.fields: token_props[f.name] = _field_to_json(f) required.append(f.name) return { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": f"{s.task_name} annotation", "type": "object", "required": ["sentence_id", "language", "tokens"], "additionalProperties": False, "properties": { "sentence_id": {"type": "string"}, "language": {"type": "string"}, "tokens": { "type": "array", "items": { "type": "object", "required": required, "additionalProperties": False, "properties": token_props, }, }, }, } def _field_to_json(f: Field) -> dict: if f.type == "enum": out: dict[str, Any] = {"type": ["string", "null"] if f.nullable else "string", "enum": list(f.values) + ([None] if f.nullable else [])} return out if f.type == "object": props = {} for sub in f.subfields: t = ["string", "null"] if sub.nullable else "string" if sub.values: props[sub.name] = {"type": t, "enum": list(sub.values) + ([None] if sub.nullable else [])} else: props[sub.name] = {"type": t} return {"type": "object", "additionalProperties": False, "properties": props} # string return {"type": ["string", "null"] if f.nullable else "string"} def validate(s: AnnotationSchema, payload: dict) -> tuple[bool, list[str]]: schema = to_json_schema(s) validator = Draft202012Validator(schema) errors = [f"{'/'.join(str(p) for p in e.path)}: {e.message}" for e in validator.iter_errors(payload)] return (len(errors) == 0, errors) # --------------------------------------------------------------------------- # Presets # --------------------------------------------------------------------------- def _parse_tagset(lang_code: str) -> list[str]: path = TAGSETS_DIR / f"{lang_code}_tagset.txt" tags = [] for line in read_text(path).splitlines(): line = line.strip() if not line: continue # Lines look like: "TAG Description" or "TAG\tDescription" tag = line.split(None, 1)[0] tags.append(tag) return tags def _ud_preset() -> AnnotationSchema: """Read the tutorial's UD schema and convert to AnnotationSchema.""" raw = json.loads(read_text(TUTORIAL_SCHEMAS_DIR / "pos_lemma_morph_schema.json")) token_props = raw["properties"]["tokens"]["items"]["properties"] upos_values = token_props["upos"]["enum"] feature_subs = [] for name, spec in token_props["features"]["properties"].items(): feature_subs.append(Subfield(name=name, values=None, nullable=True)) return AnnotationSchema( task_name="UD POS + lemma + morphology", description="Universal Dependencies UPOS + morphological features.", fields=[ Field(name="lemma", type="string", nullable=True, aggregator="lcs"), Field(name="upos", type="enum", values=upos_values, nullable=False, aggregator="vote"), Field(name="features", type="object", subfields=feature_subs, aggregator="vote_per_subfield"), Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"), Field(name="comment", type="string", nullable=True, aggregator="priority"), ], ) def _tagset_preset(lang_code: str) -> AnnotationSchema: tags = _parse_tagset(lang_code) return AnnotationSchema( task_name=f"{lang_code} POS + lemma (bespoke tagset)", language=LANGUAGES.get(lang_code, lang_code), description=f"Bespoke compound POS tags for {LANGUAGES.get(lang_code, lang_code)}.", fields=[ Field(name="lemma", type="string", nullable=True, aggregator="lcs"), Field(name="pos", type="enum", values=tags, nullable=False, aggregator="vote"), Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"), Field(name="comment", type="string", nullable=True, aggregator="priority"), ], ) def _ner_preset() -> AnnotationSchema: return AnnotationSchema( task_name="Named Entity Recognition (BIO)", description="Per-token BIO tags for PER / LOC / ORG / MISC.", fields=[ Field( name="ner", type="enum", values=["O", "B-PER", "I-PER", "B-LOC", "I-LOC", "B-ORG", "I-ORG", "B-MISC", "I-MISC"], nullable=False, aggregator="vote", ), Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"), Field(name="comment", type="string", nullable=True, aggregator="priority"), ], ) def _lemma_only_preset() -> AnnotationSchema: return AnnotationSchema( task_name="Lemmatization only", description="One lemma per token.", fields=[ Field(name="lemma", type="string", nullable=True, aggregator="lcs"), Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"), ], ) def _custom_preset() -> AnnotationSchema: return AnnotationSchema( task_name="Custom task", description="Define your own fields in the Schema tab.", fields=[ Field(name="label", type="enum", values=["LABEL_A", "LABEL_B"], nullable=False, aggregator="vote"), Field(name="confidence", type="enum", values=["low", "medium", "high"], nullable=False, aggregator="min"), ], ) PRESETS = { "ud_upos_morph": ("UD UPOS + morphology (UD-standard)", _ud_preset), "grc_tagset": ("Ancient Greek — bespoke tagset (GRC)", lambda: _tagset_preset("GRC")), "hye_tagset": ("Old Armenian — bespoke tagset (HYE)", lambda: _tagset_preset("HYE")), "kat_tagset": ("Old Georgian — bespoke tagset (KAT)", lambda: _tagset_preset("KAT")), "syc_tagset": ("Syriac — bespoke tagset (SYC)", lambda: _tagset_preset("SYC")), "ner_basic": ("Named Entity Recognition (BIO)", _ner_preset), "lemma_only": ("Lemmatization only", _lemma_only_preset), "custom": ("Custom — define your own", _custom_preset), } def list_presets() -> list[tuple[str, str]]: """Return [(preset_key, display_label), ...].""" return [(k, label) for k, (label, _) in PRESETS.items()] def from_preset(key: str) -> AnnotationSchema: if key not in PRESETS: raise KeyError(f"Unknown preset: {key}. Known: {list(PRESETS)}") return PRESETS[key][1]()