# 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/args.py ``` """ Arguments layer - identify verb arguments per clause per sentence. """ import json from world.core.layers import Layer, LayerResult, register_layer ARG_PROMPT = """Identify arguments of the verb. CRITICAL: end index is EXCLUSIVE (Python slice style). - To include token at index 4, end must be 5 For clause "they are mortal" with verb "are" at index 1: - agent: start=0, end=1 → "they" - theme: start=2, end=3 → "mortal" Reply JSON: { "arguments": [ {"start": 0, "end": 1, "role": "agent"}, {"start": 2, "end": 3, "role": "theme"} ] } Roles: agent, patient, theme, goal, source, location, instrument, time """ class ArgsLayer(Layer): id = "args" depends_on = ["base", "clauses"] ext = ".args" def process(self, inputs: dict, context: dict) -> LayerResult: base_data = inputs.get("base", {}) clauses_data = inputs.get("clauses", {}) sentences = base_data.get("sentences", []) clause_sentences = clauses_data.get("sentences", []) openai = context.get("openai") if not openai: return LayerResult(False, None, "no openai client") # Build lookup sent_tokens = {s["idx"]: [t["text"] for t in s["tokens"]] for s in sentences} result_sentences = [] total_args = 0 for clause_sent in clause_sentences: sent_idx = clause_sent["sentence_idx"] tokens = sent_tokens.get(sent_idx, []) clause_results = [] for clause in clause_sent.get("clauses", []): clause_tokens = tokens[clause["start"]:clause["end"]] verb_rel = clause["verb_index"] - clause["start"] prompt = "Clause tokens:\n" + "\n".join(f"{i}: {t}" for i, t in enumerate(clause_tokens)) prompt += f"\n\nVerb: {clause_tokens[verb_rel]} (index {verb_rel})" prompt += f"\nTotal: {len(clause_tokens)} tokens" response = openai.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": ARG_PROMPT}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, ) args = json.loads(response.choices[0].message.content) clause_results.append({ "clause_start": clause["start"], "clause_end": clause["end"], "clause_label": clause.get("label", ""), "verb_index": clause["verb_index"], "arguments": args.get("arguments", []), }) total_args += len(args.get("arguments", [])) result_sentences.append({ "sentence_idx": sent_idx, "clauses": clause_results, }) return LayerResult(True, {"sentences": result_sentences}, f"{total_args} arguments") def parse_dsl(self, text: str) -> dict: """ Parse: # sentence 0 clause [1:5] antecedent verb=2 agent [0:1] theme [2:4] """ import re sentences = [] current_sent = None current_clause = None for line in text.strip().split("\n"): orig_line = line line = line.strip() if not line: continue if line.startswith("# sentence"): if current_clause is not None and current_sent is not None: current_sent["clauses"].append(current_clause) if current_sent is not None: sentences.append(current_sent) idx = int(line.split()[-1]) current_sent = {"sentence_idx": idx, "clauses": []} current_clause = None elif line.startswith("clause"): if current_clause is not None and current_sent is not None: current_sent["clauses"].append(current_clause) match = re.match(r"clause\s+\[(\d+):(\d+)\]\s*(\w*)\s*verb=(\d+)", line) if match: start, end, label, verb = match.groups() current_clause = { "clause_start": int(start), "clause_end": int(end), "clause_label": label, "verb_index": int(verb), "arguments": [], } elif orig_line.startswith(" ") and current_clause is not None: match = re.match(r"(\w+)\s+\[(\d+):(\d+)\]", line) if match: role, start, end = match.groups() current_clause["arguments"].append({ "role": role, "start": int(start), "end": int(end), }) if current_clause is not None and current_sent is not None: current_sent["clauses"].append(current_clause) if current_sent is not None: sentences.append(current_sent) return {"sentences": sentences} def format_dsl(self, data: dict) -> str: lines = [] for sent in data.get("sentences", []): lines.append(f"# sentence {sent['sentence_idx']}") for c in sent.get("clauses", []): lines.append(f"clause [{c['clause_start']}:{c['clause_end']}] {c['clause_label']} verb={c['verb_index']}") for arg in c.get("arguments", []): lines.append(f" {arg['role']} [{arg['start']}:{arg['end']}]") lines.append("") return "\n".join(lines) register_layer(ArgsLayer())``` # 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/base.py ``` """ Base layer - establishes the canonical coordinate system. Combines: tokenization + spell correction + sentence segmentation. Output is immutable foundation for all other layers. Every token is addressable as (sentence_idx, token_idx). """ import json from world.core.layers import Layer, LayerResult, register_layer from world.core.tokenize import tokenize, Token, SpellCorrector SEGMENT_PROMPT = """You are a sentence segmenter. Given numbered tokens, identify where each sentence begins and ends. Rules: - "If X then Y" is ONE sentence, not two - Sentences end at periods, question marks, or exclamation marks - If no punctuation, the whole text is one sentence - Include ALL tokens Reply JSON: { "sentences": [ {"start": 0, "end": 5}, {"start": 5, "end": 12} ] } Token indices are 0-based. End is exclusive (Python slice style). """ class BaseLayer(Layer): id = "base" depends_on = [] ext = ".base" def process(self, inputs: dict, context: dict) -> LayerResult: doc = inputs.get("_doc") if not doc: return LayerResult(False, None, "no document") openai = context.get("openai") if not openai: return LayerResult(False, None, "no openai client") # Step 1: Tokenize raw_tokens = tokenize(doc.text) # Step 2: Spell correct corrector = SpellCorrector(openai) corrected = corrector.correct(raw_tokens) # Build flat token list flat_tokens = [] for i, c in enumerate(corrected): flat_tokens.append({ "flat_idx": i, "text": c.corrected, "original": c.original, "char_pos": c.position, }) # Step 3: Segment into sentences token_texts = [t["text"] for t in flat_tokens] prompt = "Tokens:\n" + "\n".join(f"{i}: {t}" for i, t in enumerate(token_texts)) prompt += f"\n\nTotal: {len(token_texts)} tokens (indices 0 to {len(token_texts)-1})" response = openai.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": SEGMENT_PROMPT}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, ) seg_data = json.loads(response.choices[0].message.content) sentence_bounds = seg_data.get("sentences", []) # Safety: if empty or incomplete if not sentence_bounds: sentence_bounds = [{"start": 0, "end": len(flat_tokens)}] elif sentence_bounds[-1]["end"] < len(flat_tokens): sentence_bounds[-1]["end"] = len(flat_tokens) # Step 4: Build final structure sentences = [] for sent_idx, bounds in enumerate(sentence_bounds): sent_tokens = [] for flat_idx in range(bounds["start"], bounds["end"]): tok = flat_tokens[flat_idx] sent_tokens.append({ "idx": flat_idx - bounds["start"], "text": tok["text"], "original": tok["original"], "char_pos": tok["char_pos"], "flat_idx": flat_idx, }) sentences.append({ "idx": sent_idx, "tokens": sent_tokens, }) n_tokens = sum(len(s["tokens"]) for s in sentences) n_corrections = sum(1 for t in flat_tokens if t["text"] != t["original"]) return LayerResult( True, {"sentences": sentences}, f"{len(sentences)} sentences, {n_tokens} tokens, {n_corrections} corrections" ) def parse_dsl(self, text: str) -> dict: """ Parse: # sentence 0 0: If 1: someone 2: is (was: iz) ... """ sentences = [] current_sent = None for line in text.strip().split("\n"): line = line.strip() if not line: continue if line.startswith("# sentence"): if current_sent is not None: sentences.append(current_sent) idx = int(line.split()[-1]) current_sent = {"idx": idx, "tokens": []} elif current_sent is not None and ":" in line: parts = line.split(":", 1) tok_idx = int(parts[0].strip()) rest = parts[1].strip() # Check for "(was: ...)" original = rest text = rest if "(was:" in rest: text = rest.split("(was:")[0].strip() original = rest.split("(was:")[1].rstrip(")").strip() current_sent["tokens"].append({ "idx": tok_idx, "text": text, "original": original, "char_pos": 0, "flat_idx": 0, }) if current_sent is not None: sentences.append(current_sent) return {"sentences": sentences} def format_dsl(self, data: dict) -> str: lines = [] for sent in data.get("sentences", []): lines.append(f"# sentence {sent['idx']}") for tok in sent["tokens"]: if tok.get("original") and tok["original"] != tok["text"]: lines.append(f"{tok['idx']}: {tok['text']} (was: {tok['original']})") else: lines.append(f"{tok['idx']}: {tok['text']}") lines.append("") return "\n".join(lines) def validate(self, data: dict) -> list[str]: errors = [] sentences = data.get("sentences", []) for sent in sentences: for i, tok in enumerate(sent["tokens"]): if tok["idx"] != i: errors.append(f"Sentence {sent['idx']}: token {i} has wrong idx {tok['idx']}") return errors register_layer(BaseLayer())``` # 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/clauses.py ``` """ Clauses layer - identify clause boundaries per sentence. """ import json from world.core.layers import Layer, LayerResult, register_layer CLAUSE_PROMPT = """Identify all clauses in this sentence. CRITICAL: end index is EXCLUSIVE (Python slice style). - To include token 8, end must be 9 For "If someone is a man then they are mortal" (tokens 0-8): - Clause 1: start=1, end=5 → "someone is a man" (verb_index=2) - Clause 2: start=6, end=9 → "they are mortal" (verb_index=7) - skip_tokens: [0, 5] → "If", "then" Reply JSON: { "clauses": [ {"start": 1, "end": 5, "verb_index": 2, "label": "antecedent"}, {"start": 6, "end": 9, "verb_index": 7, "label": "consequent"} ], "skip_tokens": [0, 5] } """ class ClausesLayer(Layer): id = "clauses" depends_on = ["base"] ext = ".clause" def process(self, inputs: dict, context: dict) -> LayerResult: base_data = inputs.get("base", {}) sentences = base_data.get("sentences", []) openai = context.get("openai") if not openai: return LayerResult(False, None, "no openai client") result_sentences = [] total_clauses = 0 for sent in sentences: tokens = [t["text"] for t in sent["tokens"]] prompt = "Tokens:\n" + "\n".join(f"{i}: {t}" for i, t in enumerate(tokens)) prompt += f"\n\nTotal: {len(tokens)} tokens" response = openai.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": CLAUSE_PROMPT}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, ) clause_data = json.loads(response.choices[0].message.content) result_sentences.append({ "sentence_idx": sent["idx"], "clauses": clause_data.get("clauses", []), "skip_tokens": clause_data.get("skip_tokens", []), }) total_clauses += len(clause_data.get("clauses", [])) return LayerResult(True, {"sentences": result_sentences}, f"{total_clauses} clauses") def parse_dsl(self, text: str) -> dict: """ Parse: # sentence 0 [1:5] antecedent verb=2 [6:9] consequent verb=7 skip: 0 5 """ import re sentences = [] current = None for line in text.strip().split("\n"): line = line.strip() if not line: continue if line.startswith("# sentence"): if current is not None: sentences.append(current) idx = int(line.split()[-1]) current = {"sentence_idx": idx, "clauses": [], "skip_tokens": []} elif current is not None: if line.startswith("skip:"): _, rest = line.split(":", 1) current["skip_tokens"] = [int(x) for x in rest.strip().split()] elif line.startswith("["): match = re.match(r"\[(\d+):(\d+)\]\s*(\w*)\s*verb=(\d+)", line) if match: start, end, label, verb = match.groups() current["clauses"].append({ "start": int(start), "end": int(end), "label": label or "main", "verb_index": int(verb), }) if current is not None: sentences.append(current) return {"sentences": sentences} def format_dsl(self, data: dict) -> str: lines = [] for sent in data.get("sentences", []): lines.append(f"# sentence {sent['sentence_idx']}") for c in sent.get("clauses", []): lines.append(f"[{c['start']}:{c['end']}] {c.get('label', '')} verb={c['verb_index']}") skip = sent.get("skip_tokens", []) if skip: lines.append(f"skip: {' '.join(str(s) for s in skip)}") lines.append("") return "\n".join(lines) def validate(self, data: dict) -> list[str]: errors = [] for sent in data.get("sentences", []): for c in sent.get("clauses", []): if c["start"] >= c["end"]: errors.append(f"Sentence {sent['sentence_idx']}: invalid clause [{c['start']}:{c['end']}]") if c["verb_index"] < c["start"] or c["verb_index"] >= c["end"]: errors.append(f"Sentence {sent['sentence_idx']}: verb {c['verb_index']} outside clause") return errors register_layer(ClausesLayer())``` # 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/coref.py ``` """ Coreference layer - link coreferent mentions across sentences. """ import json from world.core.layers import Layer, LayerResult, register_layer COREF_PROMPT = """Identify coreference links in these sentences. Coreference: two mentions that refer to the same entity. Use (sentence_idx, token_idx) pairs. For "If someone is a man then they are mortal": - "someone" at (0, 1) and "they" at (0, 6) refer to same person Reply JSON: { "coreferences": [ {"a": [0, 1], "b": [0, 6]} ] } Each entry has "a" and "b" as [sentence_idx, token_idx] pairs. Empty list if no coreferences. """ class CorefLayer(Layer): id = "coref" depends_on = ["base"] ext = ".coref" def process(self, inputs: dict, context: dict) -> LayerResult: base_data = inputs.get("base", {}) sentences = base_data.get("sentences", []) openai = context.get("openai") if not openai: return LayerResult(False, None, "no openai client") # Build prompt with all sentences prompt_lines = [] for sent in sentences: prompt_lines.append(f"Sentence {sent['idx']}:") for tok in sent["tokens"]: prompt_lines.append(f" ({sent['idx']}, {tok['idx']}): {tok['text']}") prompt = "\n".join(prompt_lines) response = openai.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": COREF_PROMPT}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, ) data = json.loads(response.choices[0].message.content) n_corefs = len(data.get("coreferences", [])) return LayerResult(True, data, f"{n_corefs} coreferences") def parse_dsl(self, text: str) -> dict: """ Parse: (0, 1) = (0, 6) (0, 3) = (1, 0) """ import re coreferences = [] for line in text.strip().split("\n"): line = line.strip() if not line or line.startswith("#"): continue match = re.match(r"\((\d+),\s*(\d+)\)\s*=\s*\((\d+),\s*(\d+)\)", line) if match: s1, t1, s2, t2 = match.groups() coreferences.append({ "a": [int(s1), int(t1)], "b": [int(s2), int(t2)], }) return {"coreferences": coreferences} def format_dsl(self, data: dict) -> str: lines = [] for c in data.get("coreferences", []): a = c["a"] b = c["b"] lines.append(f"({a[0]}, {a[1]}) = ({b[0]}, {b[1]})") return "\n".join(lines) if lines else "# no coreferences" register_layer(CorefLayer())``` # 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/entities.py ``` """ Entities layer - identify named entities, types, and quantifiers. """ import json from world.core.layers import Layer, LayerResult, register_layer ENTITIES_PROMPT = """Identify entities, types, and quantifiers in these sentences. ENTITIES: Named/specific things (proper nouns, definite references) - "Socrates" → entity, type person - "the bank" → entity, type place - "my car" → entity, type object TYPES: Categories/kinds (common nouns used as types in predicates) - "man", "mortal", "philosopher" → types QUANTIFIERS: Words that introduce variables - "someone", "everyone", "anyone" → quantifier, introduce variable - "they", "it" when referring to quantified entity → NOT new quantifier, just coreference Reply JSON: { "entities": [ {"id": "socrates", "type": "person", "mention": [0, 0]} ], "types": [ {"id": "man", "mention": [0, 4]}, {"id": "mortal", "mention": [0, 8]} ], "quantifiers": [ {"token": "someone", "var": "x0", "mention": [0, 1]} ] } mention is [sentence_idx, token_idx]. Use lowercase for ids. Variable names: x0, x1, x2... """ class EntitiesLayer(Layer): id = "entities" depends_on = ["base"] ext = ".ent" def process(self, inputs: dict, context: dict) -> LayerResult: base_data = inputs.get("base", {}) sentences = base_data.get("sentences", []) openai = context.get("openai") if not openai: return LayerResult(False, None, "no openai client") # Build prompt with all sentences prompt_lines = [] for sent in sentences: prompt_lines.append(f"Sentence {sent['idx']}:") for tok in sent["tokens"]: prompt_lines.append(f" ({sent['idx']}, {tok['idx']}): {tok['text']}") prompt = "\n".join(prompt_lines) response = openai.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": ENTITIES_PROMPT}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, ) data = json.loads(response.choices[0].message.content) n_ent = len(data.get("entities", [])) n_types = len(data.get("types", [])) n_quant = len(data.get("quantifiers", [])) return LayerResult(True, data, f"{n_ent} entities, {n_types} types, {n_quant} quantifiers") def parse_dsl(self, text: str) -> dict: """ Parse: # entities socrates : person @ (0, 0) # types man @ (0, 4) mortal @ (0, 8) # quantifiers someone → x0 @ (0, 1) """ import re entities = [] types = [] quantifiers = [] section = None for line in text.strip().split("\n"): line = line.strip() if not line: continue if line == "# entities": section = "entities" elif line == "# types": section = "types" elif line == "# quantifiers": section = "quantifiers" elif section == "entities": # socrates : person @ (0, 0) match = re.match(r"(\w+)\s*:\s*(\w+)\s*@\s*\((\d+),\s*(\d+)\)", line) if match: id_, type_, s, t = match.groups() entities.append({ "id": id_, "type": type_, "mention": [int(s), int(t)], }) elif section == "types": # man @ (0, 4) match = re.match(r"(\w+)\s*@\s*\((\d+),\s*(\d+)\)", line) if match: id_, s, t = match.groups() types.append({ "id": id_, "mention": [int(s), int(t)], }) elif section == "quantifiers": # someone → x0 @ (0, 1) match = re.match(r"(\w+)\s*[→->]+\s*(\w+)\s*@\s*\((\d+),\s*(\d+)\)", line) if match: token, var, s, t = match.groups() quantifiers.append({ "token": token, "var": var, "mention": [int(s), int(t)], }) return { "entities": entities, "types": types, "quantifiers": quantifiers, } def format_dsl(self, data: dict) -> str: lines = [] entities = data.get("entities", []) if entities: lines.append("# entities") for e in entities: m = e["mention"] lines.append(f"{e['id']} : {e['type']} @ ({m[0]}, {m[1]})") lines.append("") types = data.get("types", []) if types: lines.append("# types") for t in types: m = t["mention"] lines.append(f"{t['id']} @ ({m[0]}, {m[1]})") lines.append("") quantifiers = data.get("quantifiers", []) if quantifiers: lines.append("# quantifiers") for q in quantifiers: m = q["mention"] lines.append(f"{q['token']} → {q['var']} @ ({m[0]}, {m[1]})") lines.append("") return "\n".join(lines) if lines else "# no entities" register_layer(EntitiesLayer())``` # 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/ground.py ``` """ Ground layer - expand rules with entity bindings using KB. """ from world.core.layers import Layer, LayerResult, register_layer class GroundLayer(Layer): id = "ground" depends_on = ["logic", "link", "entities"] ext = ".ground" def process(self, inputs: dict, context: dict) -> LayerResult: logic_data = inputs.get("logic", {}) link_data = inputs.get("link", {}) entities_data = inputs.get("entities", {}) logic_text = logic_data.get("text", "") kb = context.get("kb") if not kb: return LayerResult(False, None, "no KB in context") if not logic_text: return LayerResult(False, None, "no logic text") try: combined_lines = [] # Add entity declarations from KB combined_lines.append("# KB Entities") for ent in kb.entities.values(): combined_lines.append(f"entity {ent.id} : {ent.type}") combined_lines.append("") # Add types from document as entities (so they can be used as arguments) doc_types = entities_data.get("types", []) if doc_types: combined_lines.append("# Document Types (as entities)") for t in doc_types: combined_lines.append(f"entity {t['id']} : type") combined_lines.append("") # Add KB facts if kb.facts: combined_lines.append("# KB Facts") for fact in kb.facts: args_str = ", ".join(f"{k}: {v}" for k, v in fact.args.items()) combined_lines.append(f"{fact.predicate}({args_str})") combined_lines.append("") # Add KB rules if kb.rules: combined_lines.append("# KB Rules") for rule in kb.rules: vars_str = ", ".join(f"{v}:{t}" for v, t in rule.variables) prem_args = ", ".join(f"{k}: {v}" for k, v in rule.premise[1].items()) conc_args = ", ".join(f"{k}: {v}" for k, v in rule.conclusion[1].items()) line = f"rule [{vars_str}]: {rule.premise[0]}({prem_args}) -> {rule.conclusion[0]}({conc_args})" if rule.weight != 1.0: line += f" [{rule.weight}]" combined_lines.append(line) combined_lines.append("") # Add document propositions combined_lines.append("# Document Propositions") for line in logic_text.split("\n"): line = line.strip() if not line or line.startswith("#") or line.startswith("entity"): continue if line.startswith("rule"): continue combined_lines.append(line) combined_text = "\n".join(combined_lines) from world.core.logical_lang import parse_logical from world.core.horn import KnowledgeBase as HornKB, format_horn_clause doc = parse_logical(combined_text) horn_kb = HornKB.from_logical_document(doc) grounded = horn_kb.ground_all() lines = [] for clause in grounded: lines.append(format_horn_clause(clause, show_vars=False)) return LayerResult(True, { "clauses": [c.to_dict() for c in grounded], "text": "\n".join(lines), "combined_logic": combined_text, }, f"{len(grounded)} grounded clauses") except Exception as e: return LayerResult(False, None, f"ground error: {e}") def parse_dsl(self, text: str) -> dict: return {"text": text} def format_dsl(self, data: dict) -> str: return data.get("text", "") register_layer(GroundLayer())``` # 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/link.py ``` """ Link layer - connects discourse entities to knowledge base entities. """ from world.core.layers import Layer, LayerResult, register_layer class LinkLayer(Layer): id = "link" depends_on = ["entities"] ext = ".link" def process(self, inputs: dict, context: dict) -> LayerResult: entities_data = inputs.get("entities", {}) # Get KB from context kb = context.get("kb") if not kb: return LayerResult(False, None, "no KB in context") discourse_entities = entities_data.get("entities", []) links = [] unlinked = [] for ent in discourse_entities: ent_id = ent["id"] kb_entity = kb.get_entity(ent_id) if kb_entity: links.append({ "discourse_id": ent_id, "discourse_type": ent["type"], "mention": ent["mention"], "kb_id": kb_entity.id, "kb_type": kb_entity.type, "status": "linked", }) else: unlinked.append({ "discourse_id": ent_id, "discourse_type": ent["type"], "mention": ent["mention"], "kb_id": None, "kb_type": None, "status": "new", }) data = { "links": links, "unlinked": unlinked, "kb_entity_count": len(kb.entities), "kb_fact_count": len(kb.facts), "kb_rule_count": len(kb.rules), } return LayerResult(True, data, f"{len(links)} linked, {len(unlinked)} new") def parse_dsl(self, text: str) -> dict: import re links = [] unlinked = [] section = None for line in text.strip().split("\n"): line = line.strip() if not line: continue if line == "# linked": section = "linked" elif line == "# unlinked": section = "unlinked" elif section == "linked": match = re.match(r"(\w+)\s*[→->]+\s*kb:(\w+)\s*@\s*\((\d+),\s*(\d+)\)", line) if match: disc_id, kb_id, s, t = match.groups() links.append({ "discourse_id": disc_id, "kb_id": kb_id, "mention": [int(s), int(t)], "status": "linked", }) elif section == "unlinked": match = re.match(r"(\w+)\s*:\s*(\w+)\s*@\s*\((\d+),\s*(\d+)\)", line) if match: disc_id, disc_type, s, t = match.groups() unlinked.append({ "discourse_id": disc_id, "discourse_type": disc_type, "mention": [int(s), int(t)], "status": "new", }) return {"links": links, "unlinked": unlinked} def format_dsl(self, data: dict) -> str: lines = [] links = data.get("links", []) if links: lines.append("# linked") for link in links: m = link["mention"] lines.append(f"{link['discourse_id']} → kb:{link['kb_id']} @ ({m[0]}, {m[1]})") lines.append("") unlinked = data.get("unlinked", []) if unlinked: lines.append("# unlinked") for ent in unlinked: m = ent["mention"] lines.append(f"{ent['discourse_id']} : {ent.get('discourse_type', '?')} @ ({m[0]}, {m[1]}) [new]") lines.append("") return "\n".join(lines) if lines else "# no entities to link" register_layer(LinkLayer())``` # 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/logic.py ``` """ Logic layer - translate syntax to logical form, using KB links when available. """ from world.core.layers import Layer, LayerResult, register_layer class LogicLayer(Layer): id = "logic" depends_on = ["base", "clauses", "args", "coref", "entities", "link"] ext = ".logic" def process(self, inputs: dict, context: dict) -> LayerResult: base_data = inputs.get("base", {}) clauses_data = inputs.get("clauses", {}) args_data = inputs.get("args", {}) coref_data = inputs.get("coref", {}) entities_data = inputs.get("entities", {}) link_data = inputs.get("link", {}) sentences = base_data.get("sentences", []) coreferences = coref_data.get("coreferences", []) # Build token lookup: (sent_idx, tok_idx) -> text token_lookup = {} for sent in sentences: for tok in sent["tokens"]: token_lookup[(sent["idx"], tok["idx"])] = tok["text"] # Build entity lookup from links # Prefer KB IDs when available, fall back to discourse IDs entity_lookup = {} for link in link_data.get("links", []): m = tuple(link["mention"]) entity_lookup[m] = link["kb_id"] # Use KB ID for ent in link_data.get("unlinked", []): m = tuple(ent["mention"]) entity_lookup[m] = ent["discourse_id"] # Use discourse ID # Build quantifier lookup: (sent_idx, tok_idx) -> var_name quant_lookup = {} var_map = {} for q in entities_data.get("quantifiers", []): m = tuple(q["mention"]) quant_lookup[m] = q["var"] var_map[m] = q["var"] # Extend var_map with coreferences for coref in coreferences: a = tuple(coref["a"]) b = tuple(coref["b"]) if a in var_map: var_map[b] = var_map[a] elif b in var_map: var_map[a] = var_map[b] lines = [] # Emit entity declarations for unlinked entities only # (KB entities are already declared in the KB) unlinked = link_data.get("unlinked", []) if unlinked: lines.append("# New entities (not in KB)") for ent in unlinked: lines.append(f"entity {ent['discourse_id']} : {ent.get('discourse_type', 'entity')}") lines.append("") # Note linked entities linked = link_data.get("links", []) if linked: lines.append("# Linked to KB") for link in linked: lines.append(f"# {link['discourse_id']} → {link['kb_id']}") lines.append("") # Collect quantifier variable types var_types = [] for q in entities_data.get("quantifiers", []): var_types.append((q["var"], "entity")) # Process each sentence for args_sent in args_data.get("sentences", []): sent_idx = args_sent["sentence_idx"] # Find matching clauses sentence clauses_sent = None for cs in clauses_data.get("sentences", []): if cs["sentence_idx"] == sent_idx: clauses_sent = cs break if not clauses_sent: continue clauses = clauses_sent.get("clauses", []) # Check if this is a rule has_antecedent = any(c.get("label") == "antecedent" for c in clauses) has_consequent = any(c.get("label") == "consequent" for c in clauses) is_rule = var_types and has_antecedent and has_consequent if is_rule: lines.append(f"# Sentence {sent_idx}: Rule") antecedent_clause = None consequent_clause = None for clause_args in args_sent.get("clauses", []): if clause_args.get("clause_label") == "antecedent": antecedent_clause = clause_args elif clause_args.get("clause_label") == "consequent": consequent_clause = clause_args if antecedent_clause and consequent_clause: premise = self._build_predicate(sent_idx, antecedent_clause, token_lookup, var_map, entity_lookup) conclusion = self._build_predicate(sent_idx, consequent_clause, token_lookup, var_map, entity_lookup) vars_str = ", ".join(f"{v}:{t}" for v, t in var_types) lines.append(f"rule [{vars_str}]: {premise} -> {conclusion}") else: lines.append(f"# Sentence {sent_idx}: Propositions") for clause_args in args_sent.get("clauses", []): pred = self._build_predicate(sent_idx, clause_args, token_lookup, var_map, entity_lookup) lines.append(pred) lines.append("") logic_text = "\n".join(lines) return LayerResult(True, {"text": logic_text}, "generated") def _build_predicate(self, sent_idx: int, clause_args: dict, token_lookup: dict, var_map: dict, entity_lookup: dict) -> str: """Build a predicate string from clause arguments.""" verb_idx = clause_args["verb_index"] clause_start = clause_args["clause_start"] # Get verb verb = token_lookup.get((sent_idx, verb_idx), "?").lower() args = [] for arg in clause_args.get("arguments", []): role = arg["role"] # Indices are relative to clause, convert to sentence abs_start = clause_start + arg["start"] abs_end = clause_start + arg["end"] # Check if any token is a variable or entity var_name = None entity_name = None for tok_idx in range(abs_start, abs_end): key = (sent_idx, tok_idx) if key in var_map: var_name = var_map[key] break if key in entity_lookup: entity_name = entity_lookup[key] break if var_name: args.append(f"{role}: {var_name}") elif entity_name: args.append(f"{role}: {entity_name}") else: # Use head word (last token, skip articles) head_idx = abs_end - 1 head = token_lookup.get((sent_idx, head_idx), "?").lower() if head in {"a", "an", "the"} and abs_end - abs_start > 1: head_idx = abs_end - 2 head = token_lookup.get((sent_idx, head_idx), "?").lower() args.append(f"{role}: {head}") return f"{verb}({', '.join(args)})" def parse_dsl(self, text: str) -> dict: return {"text": text} def format_dsl(self, data: dict) -> str: return data.get("text", "") register_layer(LogicLayer())``` # 📄 file contents: /Users/greg/mono/logic/world/src/world/core/layers/runner.py ``` """ Layer execution runner - works on runs with KB from store. """ from world.core.layers import get_layer, resolve_dependencies, LayerResult class LayerRunner: """Runs layers on a run workspace.""" def __init__(self, doc_store, run_store, kb_store=None, context: dict = None): self.doc_store = doc_store self.run_store = run_store self.kb_store = kb_store self.context = context or {} def run(self, run_id: str, layer_ids: list[str], force: bool = False) -> dict[str, LayerResult]: """Run specified layers on a run workspace.""" run = self.run_store.get(run_id) if not run: return {"_error": LayerResult(False, None, "run not found")} # Load KB and add to context if self.kb_store: kb = self.kb_store.get(run.kb_id) if kb: self.context["kb"] = kb all_layers = resolve_dependencies(layer_ids) results = {} for lid in all_layers: layer = get_layer(lid) if not force and self.run_store.has_data(run_id, lid): data = self.run_store.get_data(run_id, lid) results[lid] = LayerResult(True, data, "cached") continue inputs = {} missing = [] for dep in layer.depends_on: if self.run_store.has_data(run_id, dep): inputs[dep] = self.run_store.get_data(run_id, dep) else: missing.append(dep) if missing: results[lid] = LayerResult(False, None, f"missing deps: {missing}") continue doc = self.doc_store.get(run.doc_id) if doc: inputs["_doc"] = doc try: result = layer.process(inputs, self.context) if result.success: self.run_store.set_data(run_id, lid, result.data) results[lid] = result except Exception as e: results[lid] = LayerResult(False, None, f"error: {e}") return results def get_dsl(self, run_id: str, layer_id: str) -> str | None: """Get layer data formatted as DSL.""" layer = get_layer(layer_id) data = self.run_store.get_data(run_id, layer_id) if data is None: return None return layer.format_dsl(data)```