| |
| |
| """ |
| STRUCTURAL INQUIRY SYSTEM v2.5 |
| Engineering-Focused Knowledge Discovery with Concrete Improvements |
| """ |
|
|
| from enum import Enum |
| from dataclasses import dataclass, field |
| from typing import List, Dict, Any, Optional, Tuple, Mapping, Callable |
| import hashlib |
| from datetime import datetime |
| from types import MappingProxyType |
| import numpy as np |
|
|
| |
| KNOWLEDGE_NODE = "●" |
| PATTERN_RECOGNITION = "⟁" |
| INQUIRY_MARKER = "?" |
| VALIDATION_SYMBOL = "✓" |
|
|
| |
|
|
| class KnowledgeStateType(Enum): |
| """Knowledge state types with clear semantics""" |
| PATTERN_DETECTION = "pattern_detection" |
| DATA_CORRELATION = "data_correlation" |
| CONTEXTUAL_ALIGNMENT = "contextual_alignment" |
| METHODOLOGICAL_STRUCTURE = "methodological_structure" |
| SOURCE_VERIFICATION = "source_verification" |
| TEMPORAL_CONSISTENCY = "temporal_consistency" |
| CROSS_DOMAIN_SYNTHESIS = "cross_domain_synthesis" |
| KNOWLEDGE_GAP_IDENTIFICATION = "knowledge_gap_identification" |
|
|
| @dataclass(frozen=True) |
| class KnowledgeState: |
| """Immutable knowledge state with provenance tracking""" |
| state_id: str |
| state_type: KnowledgeStateType |
| confidence_score: float |
| confidence_provenance: str |
| methodological_rigor: float |
| data_patterns: Tuple[float, ...] |
| knowledge_domains: Tuple[str, ...] |
| temporal_markers: Tuple[str, ...] |
| research_constraints: Tuple[str, ...] |
| structural_description: str |
| validation_signature: str |
| state_hash: str = field(init=False) |
| |
| def __post_init__(self): |
| hash_input = f"{self.state_id}:{self.state_type.value}:{self.confidence_score}:" |
| hash_input += f"{self.confidence_provenance}:{self.methodological_rigor}:" |
| hash_input += ":".join(str(v) for v in self.data_patterns[:10]) |
| hash_input += ":".join(self.knowledge_domains) |
| |
| state_hash = hashlib.sha3_512(hash_input.encode()).hexdigest()[:32] |
| object.__setattr__(self, 'state_hash', state_hash) |
|
|
| |
|
|
| class InquiryCategory(Enum): |
| """Inquiry categories with clear prioritization semantics""" |
| CONFIDENCE_DISCREPANCY_ANALYSIS = "confidence_discrepancy_analysis" |
| METHODOLOGICAL_CONSISTENCY_CHECK = "methodological_consistency_check" |
| PATTERN_ANOMALY_DETECTION = "pattern_anomaly_detection" |
| TEMPORAL_ALIGNMENT_VALIDATION = "temporal_alignment_validation" |
| SOURCE_RELIABILITY_ASSESSMENT = "source_reliability_assessment" |
| CROSS_REFERENCE_VALIDATION = "cross_reference_validation" |
| KNOWLEDGE_COMPLETENESS_EVALUATION = "knowledge_completeness_evaluation" |
|
|
| |
|
|
| class AnalysisResult: |
| """Structured analysis result for inquiry generation""" |
| def __init__( |
| self, |
| category: InquiryCategory, |
| basis_code: str, |
| basis_kwargs: Dict[str, Any], |
| verification_requirements: List[str], |
| investigation_confidence: float, |
| research_completion_estimate: float, |
| priority_score: float |
| ): |
| self.category = category |
| self.basis_code = basis_code |
| self.basis_kwargs = basis_kwargs |
| self.verification_requirements = verification_requirements |
| self.investigation_confidence = investigation_confidence |
| self.research_completion_estimate = research_completion_estimate |
| self.priority_score = priority_score |
|
|
| class InquiryAnalyzer: |
| """Protocol for pluggable analysis""" |
| def analyze(self, state: KnowledgeState) -> List[AnalysisResult]: |
| """Analyze state and return multiple potential inquiries""" |
| raise NotImplementedError |
|
|
| |
|
|
| class DefaultInquiryAnalyzer(InquiryAnalyzer): |
| """Default analyzer that generates multiple inquiry candidates""" |
| |
| def __init__(self, basis_templates: Dict[str, Dict[str, Any]]): |
| self.basis_templates = basis_templates |
| |
| def analyze(self, state: KnowledgeState) -> List[AnalysisResult]: |
| """Generate multiple inquiry candidates from state""" |
| results = [] |
| |
| |
| if state.confidence_score < 0.7: |
| results.append(self._confidence_analysis(state)) |
| |
| if state.methodological_rigor < 0.65: |
| results.append(self._methodological_analysis(state)) |
| |
| if len(state.data_patterns) < 8: |
| results.append(self._pattern_analysis(state)) |
| |
| if len(state.temporal_markers) < 3: |
| results.append(self._temporal_analysis(state)) |
| |
| if len(state.knowledge_domains) > 2: |
| results.append(self._cross_domain_analysis(state)) |
| |
| |
| if not results: |
| results.append(self._default_analysis(state)) |
| |
| return results |
| |
| def _confidence_analysis(self, state: KnowledgeState) -> AnalysisResult: |
| """Analyze confidence discrepancies""" |
| confidence_factor = max(0.1, 0.8 - state.confidence_score) |
| return AnalysisResult( |
| category=InquiryCategory.CONFIDENCE_DISCREPANCY_ANALYSIS, |
| basis_code="CONFIDENCE_ANOMALY_INVESTIGATION", |
| basis_kwargs={ |
| "score": state.confidence_score * 100, |
| "expected": 75.0, |
| "provenance": state.confidence_provenance |
| }, |
| verification_requirements=[ |
| "statistical_reanalysis", |
| "source_review", |
| "methodology_audit" |
| ], |
| investigation_confidence=confidence_factor, |
| research_completion_estimate=self._calculate_completion_estimate(3, confidence_factor), |
| priority_score=self._calculate_priority_score(confidence_factor, 0.9) |
| ) |
| |
| def _methodological_analysis(self, state: KnowledgeState) -> AnalysisResult: |
| """Analyze methodological issues""" |
| rigor_factor = max(0.1, 0.7 - state.methodological_rigor) |
| return AnalysisResult( |
| category=InquiryCategory.METHODOLOGICAL_CONSISTENCY_CHECK, |
| basis_code="METHODOLOGICAL_CONSISTENCY_QUESTION", |
| basis_kwargs={ |
| "rigor": state.methodological_rigor * 100, |
| "method_type": "research_protocol" |
| }, |
| verification_requirements=[ |
| "protocol_review", |
| "reproducibility_check", |
| "peer_validation" |
| ], |
| investigation_confidence=rigor_factor, |
| research_completion_estimate=self._calculate_completion_estimate(3, rigor_factor), |
| priority_score=self._calculate_priority_score(rigor_factor, 0.8) |
| ) |
| |
| def _pattern_analysis(self, state: KnowledgeState) -> AnalysisResult: |
| """Analyze pattern anomalies""" |
| pattern_factor = len(state.data_patterns) / 10.0 |
| return AnalysisResult( |
| category=InquiryCategory.PATTERN_ANOMALY_DETECTION, |
| basis_code="PATTERN_DEVIATION_ANALYSIS", |
| basis_kwargs={ |
| "pattern_completeness": pattern_factor * 100, |
| "expected_patterns": 8 |
| }, |
| verification_requirements=[ |
| "pattern_completeness_check", |
| "data_collection_review", |
| "statistical_validation" |
| ], |
| investigation_confidence=1.0 - pattern_factor, |
| research_completion_estimate=self._calculate_completion_estimate(3, pattern_factor), |
| priority_score=self._calculate_priority_score(1.0 - pattern_factor, 0.7) |
| ) |
| |
| def _temporal_analysis(self, state: KnowledgeState) -> AnalysisResult: |
| """Analyze temporal issues""" |
| temporal_factor = len(state.temporal_markers) / 3.0 |
| return AnalysisResult( |
| category=InquiryCategory.TEMPORAL_ALIGNMENT_VALIDATION, |
| basis_code="TEMPORAL_CONSISTENCY_CHECK", |
| basis_kwargs={ |
| "marker_count": len(state.temporal_markers), |
| "expected_markers": 3 |
| }, |
| verification_requirements=[ |
| "temporal_sequence_verification", |
| "chronological_consistency_check" |
| ], |
| investigation_confidence=1.0 - temporal_factor, |
| research_completion_estimate=self._calculate_completion_estimate(2, temporal_factor), |
| priority_score=self._calculate_priority_score(1.0 - temporal_factor, 0.6) |
| ) |
| |
| def _cross_domain_analysis(self, state: KnowledgeState) -> AnalysisResult: |
| """Analyze cross-domain issues""" |
| domain_factor = min(1.0, len(state.knowledge_domains) / 5.0) |
| return AnalysisResult( |
| category=InquiryCategory.CROSS_REFERENCE_VALIDATION, |
| basis_code="CROSS_DOMAIN_ALIGNMENT_CHECK", |
| basis_kwargs={ |
| "domain_count": len(state.knowledge_domains), |
| "domains": list(state.knowledge_domains)[:3] |
| }, |
| verification_requirements=[ |
| "cross_domain_correlation", |
| "independent_verification" |
| ], |
| investigation_confidence=domain_factor, |
| research_completion_estimate=self._calculate_completion_estimate(2, domain_factor), |
| priority_score=self._calculate_priority_score(domain_factor, 0.5) |
| ) |
| |
| def _default_analysis(self, state: KnowledgeState) -> AnalysisResult: |
| """Default analysis for well-formed states""" |
| return AnalysisResult( |
| category=InquiryCategory.KNOWLEDGE_COMPLETENESS_EVALUATION, |
| basis_code="BASELINE_VERIFICATION", |
| basis_kwargs={ |
| "confidence_score": state.confidence_score * 100, |
| "rigor_score": state.methodological_rigor * 100 |
| }, |
| verification_requirements=["comprehensive_review"], |
| investigation_confidence=0.3, |
| research_completion_estimate=0.9, |
| priority_score=2.0 |
| ) |
| |
| def _calculate_completion_estimate(self, requirement_count: int, confidence: float) -> float: |
| """Calculate research completion estimate""" |
| base = 0.5 |
| requirement_impact = 0.9 ** requirement_count |
| confidence_impact = confidence * 0.4 |
| return min(0.95, base * requirement_impact + confidence_impact) |
| |
| def _calculate_priority_score(self, investigation_confidence: float, weight: float) -> float: |
| """Calculate priority score with clear semantics""" |
| base_score = investigation_confidence * weight |
| return round(base_score * 10, 2) |
|
|
| |
|
|
| INQUIRY_BASIS_TEMPLATES = { |
| "CONFIDENCE_ANOMALY_INVESTIGATION": { |
| "template": "Confidence score of {score}% ({provenance}) differs from expected baseline of {expected}%", |
| "investigation_focus": "confidence_validation" |
| }, |
| "METHODOLOGICAL_CONSISTENCY_QUESTION": { |
| "template": "Methodological rigor rating of {rigor}% suggests review of {method_type} may be beneficial", |
| "investigation_focus": "methodological_review" |
| }, |
| "PATTERN_DEVIATION_ANALYSIS": { |
| "template": "Pattern completeness at {pattern_completeness}% with {expected_patterns} expected patterns", |
| "investigation_focus": "pattern_analysis" |
| }, |
| "TEMPORAL_CONSISTENCY_CHECK": { |
| "template": "Temporal markers: {marker_count} present, {expected_markers} expected", |
| "investigation_focus": "temporal_validation" |
| }, |
| "CROSS_DOMAIN_ALIGNMENT_CHECK": { |
| "template": "Cross-domain analysis across {domain_count} domains: {domains}", |
| "investigation_focus": "cross_domain_validation" |
| }, |
| "BASELINE_VERIFICATION": { |
| "template": "Baseline verification: confidence={confidence_score}%, rigor={rigor_score}%", |
| "investigation_focus": "comprehensive_review" |
| } |
| } |
|
|
| |
|
|
| @dataclass(frozen=True) |
| class InquiryArtifact: |
| """Deterministic inquiry artifact with robust priority calculation""" |
| artifact_id: str |
| source_state_hash: str |
| inquiry_category: InquiryCategory |
| investigation_priority: int |
| knowledge_domains_involved: Tuple[str, ...] |
| basis_code: str |
| inquiry_description: str |
| verification_requirements: Tuple[str, ...] |
| investigation_confidence: float |
| research_completion_estimate: float |
| confidence_provenance: str |
| artifact_hash: str |
| creation_context: 'CreationContext' |
| |
| @classmethod |
| def create( |
| cls, |
| knowledge_state: KnowledgeState, |
| analysis_result: AnalysisResult, |
| basis_templates: Dict[str, Dict[str, Any]], |
| creation_context: 'CreationContext' |
| ) -> 'InquiryArtifact': |
| """Create inquiry artifact with deterministic hash""" |
| |
| |
| template_data = basis_templates.get(analysis_result.basis_code, {}) |
| description_template = template_data.get("template", "Analysis required") |
| inquiry_description = description_template.format(**analysis_result.basis_kwargs) |
| |
| |
| priority_value = max(1, min(10, int(round(analysis_result.priority_score)))) |
| |
| |
| hash_input = f"{knowledge_state.state_hash}:{analysis_result.category.value}:" |
| hash_input += f"{analysis_result.basis_code}:{priority_value}:" |
| hash_input += ":".join(analysis_result.verification_requirements) |
| hash_input += creation_context.context_hash |
| |
| artifact_hash = hashlib.sha3_512(hash_input.encode()).hexdigest()[:32] |
| artifact_id = f"inq_{artifact_hash[:16]}" |
| |
| return cls( |
| artifact_id=artifact_id, |
| source_state_hash=knowledge_state.state_hash, |
| inquiry_category=analysis_result.category, |
| investigation_priority=priority_value, |
| knowledge_domains_involved=knowledge_state.knowledge_domains, |
| basis_code=analysis_result.basis_code, |
| inquiry_description=inquiry_description, |
| verification_requirements=tuple(analysis_result.verification_requirements), |
| investigation_confidence=analysis_result.investigation_confidence, |
| research_completion_estimate=analysis_result.research_completion_estimate, |
| confidence_provenance=knowledge_state.confidence_provenance, |
| artifact_hash=artifact_hash, |
| creation_context=creation_context |
| ) |
| |
| def reference_information(self) -> Mapping[str, Any]: |
| """Immutable reference information""" |
| return MappingProxyType({ |
| "artifact_id": self.artifact_id, |
| "source_state": self.source_state_hash[:12], |
| "inquiry_category": self.inquiry_category.value, |
| "investigation_priority": self.investigation_priority, |
| "priority_semantics": self._priority_semantics(), |
| "knowledge_domains": list(self.knowledge_domains_involved), |
| "basis": { |
| "code": self.basis_code, |
| "description": self.inquiry_description, |
| "confidence_provenance": self.confidence_provenance |
| }, |
| "verification_requirements": list(self.verification_requirements), |
| "investigation_confidence": round(self.investigation_confidence, 3), |
| "research_completion_estimate": round(self.research_completion_estimate, 3), |
| "artifact_hash": self.artifact_hash, |
| "creation_context": self.creation_context.reference_data() |
| }) |
| |
| def _priority_semantics(self) -> str: |
| """Document priority semantics""" |
| if self.investigation_priority >= 9: |
| return "critical_immediate_attention" |
| elif self.investigation_priority >= 7: |
| return "high_priority_review" |
| elif self.investigation_priority >= 5: |
| return "moderate_priority" |
| elif self.investigation_priority >= 3: |
| return "low_priority_backlog" |
| else: |
| return "informational_only" |
|
|
| |
|
|
| @dataclass(frozen=True) |
| class CreationContext: |
| """Immutable creation context""" |
| system_version: str |
| generation_timestamp: str |
| research_environment: str |
| deterministic_seed: Optional[int] |
| context_hash: str = field(init=False) |
| |
| def __post_init__(self): |
| hash_input = f"{self.system_version}:{self.generation_timestamp}:" |
| hash_input += f"{self.research_environment}:{self.deterministic_seed or 'none'}" |
| |
| context_hash = hashlib.sha3_512(hash_input.encode()).hexdigest()[:32] |
| object.__setattr__(self, 'context_hash', context_hash) |
| |
| @classmethod |
| def create( |
| cls, |
| research_environment: str = "knowledge_discovery_system", |
| deterministic_seed: Optional[int] = None, |
| clock_source: Callable[[], datetime] = datetime.now |
| ) -> 'CreationContext': |
| """Factory method with optional determinism""" |
| return cls( |
| system_version="structural_inquiry_v2.5", |
| generation_timestamp=clock_source().isoformat(), |
| research_environment=research_environment, |
| deterministic_seed=deterministic_seed |
| ) |
| |
| def reference_data(self) -> Mapping[str, Any]: |
| return MappingProxyType({ |
| "system_version": self.system_version, |
| "generation_timestamp": self.generation_timestamp, |
| "research_environment": self.research_environment, |
| "deterministic_mode": self.deterministic_seed is not None, |
| "context_hash": self.context_hash[:12] |
| }) |
|
|
| |
|
|
| class InquiryGenerator: |
| """ |
| Deterministic inquiry generator with pluggable analysis |
| """ |
| |
| def __init__( |
| self, |
| analyzer: Optional[InquiryAnalyzer] = None, |
| creation_context: Optional[CreationContext] = None, |
| deterministic_seed: Optional[int] = None |
| ): |
| self.analyzer = analyzer or DefaultInquiryAnalyzer(INQUIRY_BASIS_TEMPLATES) |
| self.creation_context = creation_context or CreationContext.create( |
| deterministic_seed=deterministic_seed |
| ) |
| self.generated_inquiries: List[InquiryArtifact] = [] |
| |
| |
| if deterministic_seed is not None: |
| np.random.seed(deterministic_seed) |
| |
| def generate_inquiries( |
| self, |
| knowledge_states: Tuple[KnowledgeState, ...], |
| confidence_threshold: float = 0.7 |
| ) -> Tuple[InquiryArtifact, ...]: |
| """Generate inquiries from knowledge states""" |
| |
| inquiries = [] |
| |
| for state in knowledge_states: |
| |
| analysis_results = self.analyzer.analyze(state) |
| |
| for result in analysis_results: |
| |
| if result.investigation_confidence >= confidence_threshold: |
| inquiry = InquiryArtifact.create( |
| knowledge_state=state, |
| analysis_result=result, |
| basis_templates=INQUIRY_BASIS_TEMPLATES, |
| creation_context=self.creation_context |
| ) |
| inquiries.append(inquiry) |
| self.generated_inquiries.append(inquiry) |
| |
| return tuple(inquiries) |
|
|
| |
|
|
| class ResearchSystem: |
| """Abstract research system interface""" |
| |
| async def research(self, topic: str, **kwargs) -> Dict[str, Any]: |
| """Conduct research on topic (must be implemented)""" |
| raise NotImplementedError |
|
|
| |
|
|
| class IntegratedKnowledgeDiscovery: |
| """ |
| Integrated system with clear async boundaries and determinism |
| """ |
| |
| def __init__( |
| self, |
| research_system: ResearchSystem, |
| deterministic_seed: Optional[int] = None |
| ): |
| """ |
| Initialize with concrete research system |
| |
| Args: |
| research_system: Must implement ResearchSystem interface |
| deterministic_seed: Optional seed for reproducible results |
| """ |
| if not isinstance(research_system, ResearchSystem): |
| raise TypeError("research_system must implement ResearchSystem interface") |
| |
| self.research_system = research_system |
| self.deterministic_seed = deterministic_seed |
| self.inquiry_generator = InquiryGenerator(deterministic_seed=deterministic_seed) |
| self.discovery_history: List[Dict[str, Any]] = [] |
| |
| async def conduct_research_with_inquiries( |
| self, |
| research_topic: str, |
| confidence_threshold: float = 0.7, |
| **research_kwargs |
| ) -> Dict[str, Any]: |
| """Conduct research and generate knowledge inquiries""" |
| |
| |
| research_result = await self.research_system.research(research_topic, **research_kwargs) |
| |
| |
| knowledge_state = self._convert_to_knowledge_state(research_result) |
| |
| |
| knowledge_states = (knowledge_state,) |
| inquiry_artifacts = self.inquiry_generator.generate_inquiries( |
| knowledge_states, |
| confidence_threshold |
| ) |
| |
| |
| inquiry_collection = { |
| "collection_id": f"inq_coll_{hashlib.sha256(knowledge_state.state_hash.encode()).hexdigest()[:16]}", |
| "research_topic": research_topic, |
| "knowledge_state_hash": knowledge_state.state_hash[:12], |
| "inquiry_count": len(inquiry_artifacts), |
| "generation_timestamp": datetime.utcnow().isoformat(), |
| "confidence_threshold": confidence_threshold, |
| "deterministic_mode": self.deterministic_seed is not None, |
| "inquiries": [i.reference_information() for i in inquiry_artifacts] |
| } |
| |
| |
| self.discovery_history.append({ |
| "research_topic": research_topic, |
| "research_result": research_result, |
| "knowledge_state": knowledge_state, |
| "inquiry_collection": inquiry_collection, |
| "inquiry_artifacts": inquiry_artifacts |
| }) |
| |
| return { |
| "research_topic": research_topic, |
| "research_summary": { |
| "confidence_score": research_result.get("confidence_score", 0.5), |
| "methodological_rigor": research_result.get("methodological_rigor", 0.5), |
| "domains": research_result.get("knowledge_domains", []) |
| }, |
| "inquiry_generation": { |
| "inquiries_generated": len(inquiry_artifacts), |
| "inquiry_collection_id": inquiry_collection["collection_id"], |
| "priority_distribution": self._summarize_priorities(inquiry_artifacts), |
| "confidence_threshold_met": len(inquiry_artifacts) > 0 |
| } |
| } |
| |
| def _convert_to_knowledge_state( |
| self, |
| research_result: Dict[str, Any] |
| ) -> KnowledgeState: |
| """Convert research result to knowledge state""" |
| |
| |
| confidence_score = research_result.get("confidence_score", 0.5) |
| confidence_provenance = research_result.get( |
| "confidence_provenance", |
| "derived_from_research" |
| ) |
| |
| |
| if confidence_score < 0.6: |
| state_type = KnowledgeStateType.SOURCE_VERIFICATION |
| elif "pattern" in str(research_result.get("structural_description", "")).lower(): |
| state_type = KnowledgeStateType.PATTERN_DETECTION |
| elif len(research_result.get("knowledge_domains", [])) > 2: |
| state_type = KnowledgeStateType.CROSS_DOMAIN_SYNTHESIS |
| else: |
| state_type = KnowledgeStateType.DATA_CORRELATION |
| |
| |
| if self.deterministic_seed is not None: |
| |
| pattern_seed = hash(f"{self.deterministic_seed}:{research_result.get('content_hash', '')}") |
| np.random.seed(pattern_seed % (2**32)) |
| data_patterns = tuple(np.random.randn(8).tolist()) |
| else: |
| |
| provided_patterns = research_result.get("data_patterns", []) |
| data_patterns = tuple(provided_patterns[:8]) if provided_patterns else tuple(np.sin(np.arange(8) * 0.785).tolist()) |
| |
| |
| structural_description = self._generate_structural_description(research_result) |
| |
| |
| validation_signature = hashlib.sha3_512( |
| f"{research_result.get('content_hash', '')}:{self.deterministic_seed or 'stochastic'}".encode() |
| ).hexdigest()[:32] |
| |
| return KnowledgeState( |
| state_id=f"knowledge_state_{research_result.get('content_hash', 'unknown')[:12]}", |
| state_type=state_type, |
| confidence_score=confidence_score, |
| confidence_provenance=confidence_provenance, |
| methodological_rigor=research_result.get("methodological_rigor", 0.5), |
| data_patterns=data_patterns, |
| knowledge_domains=tuple(research_result.get("knowledge_domains", ["general"])), |
| temporal_markers=( |
| research_result.get("timestamp", ""), |
| datetime.utcnow().isoformat() |
| ), |
| research_constraints=self._extract_constraints(research_result), |
| structural_description=structural_description, |
| validation_signature=validation_signature |
| ) |
| |
| def _generate_structural_description( |
| self, |
| research_result: Dict[str, Any] |
| ) -> str: |
| """Generate structural description""" |
| components = [] |
| |
| confidence = research_result.get("confidence_score", 0.5) |
| provenance = research_result.get("confidence_provenance", "unstated") |
| |
| if confidence < 0.6: |
| components.append(f"Low confidence ({confidence:.2f}) from {provenance}") |
| elif confidence > 0.8: |
| components.append(f"High confidence ({confidence:.2f}) from {provenance}") |
| |
| rigor = research_result.get("methodological_rigor", 0.5) |
| if rigor < 0.6: |
| components.append(f"Methodological rigor: {rigor:.2f}") |
| |
| domains = research_result.get("knowledge_domains", []) |
| if len(domains) > 2: |
| components.append(f"Cross-domain: {len(domains)} domains") |
| |
| if not components: |
| components.append("Standard research structure") |
| |
| return f"{KNOWLEDGE_NODE} " + "; ".join(components) |
| |
| def _extract_constraints( |
| self, |
| research_result: Dict[str, Any] |
| ) -> Tuple[str, ...]: |
| """Extract research constraints""" |
| constraints = [] |
| |
| if research_result.get("confidence_score", 0) < 0.7: |
| constraints.append("confidence_verification_needed") |
| |
| if research_result.get("methodological_rigor", 0) < 0.6: |
| constraints.append("methodology_review_recommended") |
| |
| if not research_result.get("source_references", []): |
| constraints.append("source_corroboration_required") |
| |
| if not constraints: |
| constraints.append("standard_verification_protocol") |
| |
| return tuple(constraints) |
| |
| def _summarize_priorities( |
| self, |
| inquiry_artifacts: Tuple[InquiryArtifact, ...] |
| ) -> Dict[str, Any]: |
| """Summarize inquiry priorities with clear semantics""" |
| if not inquiry_artifacts: |
| return {"message": "No inquiries generated", "priority_levels": {}} |
| |
| priority_summary = {} |
| for artifact in inquiry_artifacts: |
| priority = artifact.investigation_priority |
| if priority not in priority_summary: |
| priority_summary[priority] = { |
| "count": 0, |
| "domains": set(), |
| "semantics": artifact._priority_semantics() |
| } |
| |
| priority_summary[priority]["count"] += 1 |
| priority_summary[priority]["domains"].update(artifact.knowledge_domains_involved) |
| |
| |
| for priority in priority_summary: |
| priority_summary[priority]["domains"] = list(priority_summary[priority]["domains"]) |
| |
| return { |
| "total_priorities": len(priority_summary), |
| "highest_priority": max(priority_summary.keys()), |
| "priority_distribution": priority_summary |
| } |
| |
| def get_statistics(self) -> Dict[str, Any]: |
| """Get system statistics""" |
| total_inquiries = len(self.inquiry_generator.generated_inquiries) |
| |
| |
| category_counts = {} |
| for inquiry in self.inquiry_generator.generated_inquiries: |
| category = inquiry.inquiry_category.value |
| category_counts[category] = category_counts.get(category, 0) + 1 |
| |
| |
| if total_inquiries > 0: |
| avg_confidence = np.mean([i.investigation_confidence for i in self.inquiry_generator.generated_inquiries]) |
| avg_priority = np.mean([i.investigation_priority for i in self.inquiry_generator.generated_inquiries]) |
| else: |
| avg_confidence = 0.0 |
| avg_priority = 0.0 |
| |
| return { |
| "system": "Integrated Knowledge Discovery v2.5", |
| "research_sessions": len(self.discovery_history), |
| "total_inquiries_generated": total_inquiries, |
| "category_distribution": category_counts, |
| "average_investigation_confidence": round(float(avg_confidence), 3), |
| "average_investigation_priority": round(float(avg_priority), 1), |
| "deterministic_mode": self.deterministic_seed is not None, |
| "engineering_properties": { |
| "immutable_data_structures": True, |
| "deterministic_hashes": True, |
| "pluggable_analyzers": True, |
| "clear_async_boundaries": True, |
| "priority_semantics_documented": True |
| } |
| } |
|
|
| |
|
|
| class ConcreteResearchSystem(ResearchSystem): |
| """Example research system with proper async implementation""" |
| |
| def __init__(self, deterministic_seed: Optional[int] = None): |
| self.deterministic_seed = deterministic_seed |
| if deterministic_seed is not None: |
| np.random.seed(deterministic_seed) |
| |
| async def research(self, topic: str, **kwargs) -> Dict[str, Any]: |
| """Conduct research (simulated for example)""" |
| |
| import asyncio |
| await asyncio.sleep(0.1) |
| |
| |
| if self.deterministic_seed is not None: |
| |
| topic_hash = hash(topic) % 1000 |
| confidence = 0.5 + (topic_hash % 500) / 1000 |
| rigor = 0.4 + (topic_hash % 600) / 1000 |
| else: |
| |
| confidence = np.random.random() * 0.3 + 0.5 |
| rigor = np.random.random() * 0.4 + 0.4 |
| |
| return { |
| "topic": topic, |
| "content_hash": hashlib.sha256(topic.encode()).hexdigest()[:32], |
| "confidence_score": confidence, |
| "confidence_provenance": "simulated_analysis", |
| "methodological_rigor": rigor, |
| "knowledge_domains": self._identify_domains(topic), |
| "structural_description": f"Research on {topic}", |
| "timestamp": datetime.utcnow().isoformat(), |
| "data_patterns": np.sin(np.arange(10) * 0.628).tolist(), |
| "source_references": [f"ref_{i}" for i in range(np.random.randint(1, 4))] |
| } |
| |
| def _identify_domains(self, topic: str) -> List[str]: |
| """Identify domains from topic""" |
| domains = [] |
| topic_lower = topic.lower() |
| |
| if any(word in topic_lower for word in ["quantum", "physics"]): |
| domains.append("physics") |
| if any(word in topic_lower for word in ["history", "ancient"]): |
| domains.append("history") |
| if any(word in topic_lower for word in ["consciousness", "mind"]): |
| domains.append("psychology") |
| if any(word in topic_lower for word in ["pattern", "analysis"]): |
| domains.append("mathematics") |
| |
| return domains if domains else ["interdisciplinary"] |
|
|
| |
|
|
| def run_deterministic_test() -> bool: |
| """Test deterministic reproducibility""" |
| print("Testing deterministic reproducibility...") |
| |
| |
| research_system1 = ConcreteResearchSystem(deterministic_seed=42) |
| system1 = IntegratedKnowledgeDiscovery(research_system1, deterministic_seed=42) |
| |
| research_system2 = ConcreteResearchSystem(deterministic_seed=42) |
| system2 = IntegratedKnowledgeDiscovery(research_system2, deterministic_seed=42) |
| |
| import asyncio |
| |
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| |
| result1 = loop.run_until_complete( |
| system1.conduct_research_with_inquiries("Test topic") |
| ) |
| result2 = loop.run_until_complete( |
| system2.conduct_research_with_inquiries("Test topic") |
| ) |
| |
| loop.close() |
| |
| |
| inquiries1 = result1["inquiry_generation"]["inquiries_generated"] |
| inquiries2 = result2["inquiry_generation"]["inquiries_generated"] |
| |
| print(f" System 1 inquiries: {inquiries1}") |
| print(f" System 2 inquiries: {inquiries2}") |
| print(f" Results identical: {inquiries1 == inquiries2}") |
| |
| return inquiries1 == inquiries2 |
|
|
| |
|
|
| async def main(): |
| """Demonstrate the system""" |
| print(f""" |
| {'='*70} |
| STRUCTURAL INQUIRY SYSTEM v2.5 |
| Engineering-Focused Knowledge Discovery |
| {'='*70} |
| """) |
| |
| |
| if run_deterministic_test(): |
| print(f"\n{VALIDATION_SYMBOL} Deterministic reproducibility verified") |
| else: |
| print(f"\n{INQUIRY_MARKER} Non-deterministic behavior detected") |
| |
| |
| research_system = ConcreteResearchSystem() |
| discovery_system = IntegratedKnowledgeDiscovery(research_system) |
| |
| topics = [ |
| "Quantum pattern analysis techniques", |
| "Historical methodology consistency", |
| "Cross-domain verification protocols" |
| ] |
| |
| for i, topic in enumerate(topics, 1): |
| print(f"\n{PATTERN_RECOGNITION} RESEARCH SESSION {i}: {topic}") |
| print(f"{'-'*60}") |
| |
| result = await discovery_system.conduct_research_with_inquiries( |
| topic, |
| confidence_threshold=0.6 |
| ) |
| |
| inquiries = result["inquiry_generation"]["inquiries_generated"] |
| priorities = result["inquiry_generation"]["priority_distribution"] |
| |
| print(f" {VALIDATION_SYMBOL} Research completed") |
| print(f" {KNOWLEDGE_NODE} Inquiries generated: {inquiries}") |
| |
| if inquiries > 0: |
| for priority, data in priorities.get("priority_distribution", {}).items(): |
| semantics = data.get("semantics", "unknown") |
| print(f" Priority {priority} ({semantics}): {data['count']} inquiries") |
| |
| |
| stats = discovery_system.get_statistics() |
| print(f"\n{'='*70}") |
| print("SYSTEM STATISTICS") |
| print(f"{'='*70}") |
| |
| print(f"\nResearch sessions: {stats['research_sessions']}") |
| print(f"Total inquiries: {stats['total_inquiries_generated']}") |
| print(f"\nEngineering properties:") |
| for prop, value in stats["engineering_properties"].items(): |
| status = "✓" if value else "✗" |
| print(f" {status} {prop}: {value}") |
|
|
| if __name__ == "__main__": |
| import asyncio |
| |
| try: |
| asyncio.run(main()) |
| except KeyboardInterrupt: |
| print(f"\n\n{KNOWLEDGE_NODE} System shutdown complete.") |