Skip to main content

GraphoraClient

The GraphoraClient class is the main entry point for interacting with the Graphora API. It provides methods for all major API endpoints, including ontology management, document transformation, graph merging, and graph manipulation.

Initialization

import os
from graphora import GraphoraClient

client = GraphoraClient(
    auth_token=os.environ["GRAPHORA_AUTH_TOKEN"],
    user_id="optional-local-id",  # optional helper for local bookkeeping
    timeout=60
)

Parameters

ParameterTypeDescriptionDefault
base_urlstrBase URL of the Graphora APINone (defaults to GRAPHORA_API_URL or https://api.graphora.io)
auth_tokenstrClerk-issued bearer token for authenticationNone (falls back to GRAPHORA_AUTH_TOKEN)
user_idstrOptional identifier for client-side loggingNone
timeoutintRequest timeout in seconds60

Environment Variables

The client can also use environment variables for configuration:
  • GRAPHORA_AUTH_TOKEN: Clerk bearer token (used when auth_token is omitted)
  • GRAPHORA_USER_ID: Optional user identifier (used only for client logs)
  • GRAPHORA_API_URL: Optional API URL override (used when base_url is omitted)

Ontology Methods

register_ontology

Register, validate and upload an ontology definition.
def register_ontology(self, ontology_yaml: str) -> OntologyResponse:
    """
    Register, validate and upload an ontology definition.
    
    Args:
        ontology_yaml: Ontology definition in YAML format
        
    Returns:
        OntologyResponse with the ID of the validated ontology
    """

Example

# Example ontology in the new format
ontology_yaml = """
version: 1
entities:
  Company:
    properties:
      name:
        type: string
        description: Name of the Company
        unique: true
        required: true
      cik:
        type: string
        description: Cusip Identifier for Company
        unique: true
    relationships:
      HAS_BUSINESS:
        target: Business
        cardinality: many_to_one
"""
    
ontology_response = client.register_ontology(ontology_yaml)
ontology_id = ontology_response.id

get_ontology

Retrieve an ontology by ID.
def get_ontology(self, ontology_id: str) -> str:
    """
    Retrieve an ontology by ID.
    
    Args:
        ontology_id: ID of the ontology to retrieve
        
    Returns:
        Ontology YAML text
    """

Example

ontology_yaml = client.get_ontology("your-ontology-id")

with open("retrieved_ontology.yaml", "w") as f:
    f.write(ontology_yaml)

Transformation Methods

transform

Upload documents for processing.
def transform(
    self,
    ontology_id: str,
    files: List[Union[str, Path, BinaryIO]],
    metadata: Optional[List[DocumentMetadata]] = None
) -> TransformResponse:
    """
    Upload documents for processing.
    
    Args:
        ontology_id: ID of the ontology to use for transformation
        files: List of files to process (paths or file-like objects)
        metadata: Optional metadata for each document
        
    Returns:
        TransformResponse with the ID for tracking progress
    """

Example

from graphora.models import DocumentMetadata, DocumentType

# Create metadata (optional)
metadata = [
    DocumentMetadata(
        source="document1.pdf",
        document_type=DocumentType.PDF,
        tags=["example"],
        custom_metadata={"priority": "high"}
    )
]

# Transform documents
transform_response = client.transform(
    ontology_id="your-ontology-id",
    files=["document1.pdf", "document2.txt"],
    metadata=metadata
)

transform_id = transform_response.id

get_transform_status

Get the status of a transformation.
def get_transform_status(
    self,
    transform_id: str,
    include_metrics: bool = True
) -> TransformStatus:
    """
    Get the status of a transformation.
    
    Args:
        transform_id: ID of the transformation to check
        include_metrics: Whether to include resource metrics
        
    Returns:
        TransformStatus with current progress
    """

Example

status = client.get_transform_status("your-transform-id")
print(
    f"Overall: {status.overall_status} | Stage: {status.current_stage} | "
    f"Complete: {status.percentage_complete:.1f}%"
)

wait_for_transform

Wait for a transformation to complete.
def wait_for_transform(
    self,
    transform_id: str,
    timeout: int = 300,
    poll_interval: int = 5
) -> TransformStatus:
    """
    Wait for a transformation to complete.
    
    Args:
        transform_id: ID of the transformation to wait for
        timeout: Maximum time to wait in seconds
        poll_interval: Time between status checks in seconds
        
    Returns:
        Final TransformStatus
        
    Raises:
        GraphoraClientError: If the transformation fails or times out
    """

Example

try:
    final_status = client.wait_for_transform(
        transform_id="your-transform-id",
        timeout=600,  # 10 minutes
        poll_interval=10
    )
    print(f"Run finished with: {final_status.overall_status}")
except GraphoraClientError as e:
    print(f"Error: {str(e)}")

cleanup_transform

Clean up transformation data.
def cleanup_transform(self, transform_id: str) -> bool:
    """
    Clean up transformation data.
    
    Args:
        transform_id: ID of the transformation to clean up
        
    Returns:
        True if cleanup was successful
    """

Example

success = client.cleanup_transform("your-transform-id")
if success:
    print("Cleanup successful")
else:
    print("Cleanup failed")

get_transformed_graph

Retrieve graph data by transform ID.
def get_transformed_graph(
    self,
    transform_id: str,
    limit: int = 1000,
    skip: int = 0
) -> GraphResponse:
    """
    Retrieve graph data by transform ID.
    
    Args:
        transform_id: ID of the transformation
        limit: Maximum number of nodes to return
        skip: Number of nodes to skip for pagination
        
    Returns:
        GraphResponse with nodes and edges
    """

Example

graph = client.get_transformed_graph(
    transform_id="your-transform-id",
    limit=500,
    skip=0
)
print(f"Graph has {len(graph.nodes)} nodes and {len(graph.edges)} edges")

update_transform_graph

Save bulk modifications to the graph.
def update_transform_graph(
    self,
    transform_id: str,
    changes: SaveGraphRequest
) -> SaveGraphResponse:
    """
    Save bulk modifications to the graph.
    
    Args:
        transform_id: ID of the transformation
        changes: Batch of modifications to apply
        
    Returns:
        SaveGraphResponse with updated graph data
    """

Example

from graphora.models import SaveGraphRequest, NodeChange, EdgeChange

# Prepare changes
node_changes = [
    NodeChange(
        id=None,  # None for new nodes
        labels=["Person"],
        properties={"name": "John Smith"},
        is_deleted=False
    )
]

edge_changes = []

# Create the save request
save_request = SaveGraphRequest(
    nodes=node_changes,
    edges=edge_changes
)

# Save the changes
response = client.update_transform_graph(
    transform_id="your-transform-id",
    changes=save_request
)

# Get the updated graph
updated_graph = response.data

Merge Methods

start_merge

Start a new merge process.
def start_merge(
    self,
    session_id: str,
    transform_id: str,
    merge_id: Optional[str] = None
) -> MergeInitResponse:
    """
    Start a new merge process.
    
    Args:
        session_id: Session ID (ontology ID)
        transform_id: ID of the transformation to merge
        merge_id: Optional custom merge ID
        
    Returns:
        MergeInitResponse with the ID for tracking progress
    """

Example

merge = client.start_merge(
    session_id="your-ontology-id",
    transform_id="your-transform-id"
)

merge_id = merge.merge_id

get_merge_status

Get the status of a merge process.
def get_merge_status(self, merge_id: str) -> MergeState:
    """
    Get the status of a merge process.
    
    Args:
        merge_id: ID of the merge to check
        
    Returns:
        MergeState identifier (e.g., `MERGE_IN_PROGRESS`, `COMPLETED`)
    """

Example

state = client.get_merge_status("your-merge-id")
print(f"Merge state: {state}")

get_conflicts

Get conflicts for a merge process.
def get_conflicts(self, merge_id: str) -> List[ChangeLog]:
    """
    Get conflicts for a merge process.
    
    Args:
        merge_id: ID of the merge process
        
    Returns:
        List of change logs requiring human review
    """

Example

conflicts = client.get_conflicts("your-merge-id")
for conflict in conflicts:
    print(f"Conflict ID: {conflict.id}")
    print(f"Node: {conflict.staging_node.id} -> {conflict.prop_changes}")

resolve_conflict

Apply a resolution to a specific conflict.
def resolve_conflict(
    self,
    merge_id: str,
    conflict_id: str,
    changed_props: Dict[str, Any],
    resolution: ResolutionStrategy,
    learning_comment: str = ""
) -> bool:
    """
    Apply a resolution to a specific conflict.
    
    Args:
        merge_id: ID of the merge process
        conflict_id: ID of the conflict to resolve
        changed_props: Properties that were changed
        resolution: The resolution decision
        learning_comment: Comment on the resolution
        
    Returns:
        True if the resolution was applied successfully
    """

Example

from graphora.models import ResolutionStrategy

success = client.resolve_conflict(
    merge_id="your-merge-id",
    conflict_id="conflict-123",
    changed_props={},
    resolution=ResolutionStrategy.KEEP_STAGING,
    learning_comment="Keep staging data; production is stale"
)

if success:
    print("Conflict resolved successfully")
else:
    print("Failed to resolve conflict")

get_merge_statistics

Get detailed statistics of a merge operation.
def get_merge_statistics(self, merge_id: str) -> Dict[str, Any]:
    """
    Get detailed statistics of a merge operation.
    
    Args:
        merge_id: ID of the merge process
        
    Returns:
        Dictionary of merge statistics
    """

Example

statistics = client.get_merge_statistics("your-merge-id")
print("Merge Statistics:")
for key, value in statistics.items():
    print(f"  {key}: {value}")

get_merged_graph

Retrieve graph data for a merge process.
def get_merged_graph(self, merge_id: str, transform_id: str) -> GraphResponse:
    """
    Retrieve graph data for a merge process.
    
    Args:
        merge_id: ID of the merge process
        transform_id: ID of the transformation
        
    Returns:
        GraphResponse with nodes and edges
    """

Example

graph = client.get_merged_graph(
    merge_id="your-merge-id",
    transform_id="your-transform-id"
)
print(f"Graph has {len(graph.nodes)} nodes and {len(graph.edges)} edges")

Quality Validation Methods

The GraphoraClient includes comprehensive quality validation methods to assess and manage the quality of extracted knowledge graphs.

get_quality_results

Get quality validation results for a transform.
def get_quality_results(self, transform_id: str) -> QualityResults:
    """
    Get quality validation results for a transform.
    
    Args:
        transform_id: ID of the transformation to get quality results for
        
    Returns:
        QualityResults with validation metrics, violations, and scores
    """

Example

from graphora import GraphoraClient

client = GraphoraClient(base_url="https://api.graphora.io", user_id="user-123")

# Get quality results
results = client.get_quality_results("your-transform-id")

print(f"Overall Score: {results.overall_score:.1f}/100")
print(f"Grade: {results.grade}")
print(f"Requires Review: {results.requires_review}")
print(f"Total Violations: {len(results.violations)}")
print(f"Rules Applied: {results.rules_applied}")
print(f"Property Completeness: {results.metrics.property_completeness_rate:.1f}%")

# Check violations by severity
for severity, count in results.violations_by_severity.items():
    print(f"  {severity}: {count}")

approve_quality_results

Approve quality validation results and proceed to merge.
def approve_quality_results(
    self, 
    transform_id: str, 
    approval_comment: Optional[str] = None
) -> QualityApprovalResponse:
    """
    Approve quality validation results and proceed to merge.
    
    Args:
        transform_id: ID of the transformation to approve
        approval_comment: Optional comment about the approval
        
    Returns:
        QualityApprovalResponse confirming the approval
    """

Example

# Approve with comment
approval = client.approve_quality_results(
    transform_id="your-transform-id",
    approval_comment="Quality looks good, proceeding to merge"
)

print(f"Status: {approval.status}")
print(f"Message: {approval.message}")

reject_quality_results

Reject quality validation results and stop the process.
def reject_quality_results(
    self, 
    transform_id: str, 
    rejection_reason: str
) -> QualityRejectionResponse:
    """
    Reject quality validation results and stop the process.
    
    Args:
        transform_id: ID of the transformation to reject
        rejection_reason: Required reason for rejecting the results
        
    Returns:
        QualityRejectionResponse confirming the rejection
    """

Example

# Reject with reason
rejection = client.reject_quality_results(
    transform_id="your-transform-id",
    rejection_reason="Too many validation errors, needs manual review"
)

print(f"Status: {rejection.status}")
print(f"Reason: {rejection.reason}")

get_quality_violations

Get detailed quality violations with filtering and pagination.
def get_quality_violations(
    self,
    transform_id: str,
    violation_type: Optional[QualityRuleType] = None,
    severity: Optional[QualitySeverity] = None,
    entity_type: Optional[str] = None,
    limit: int = 100,
    offset: int = 0
) -> QualityViolationsResponse:
    """
    Get detailed quality violations with filtering and pagination.
    
    Args:
        transform_id: ID of the transformation
        violation_type: Filter by rule type (format, business, etc.)
        severity: Filter by severity level (error, warning, info)
        entity_type: Filter by entity type
        limit: Maximum number of violations to return (1-1000)
        offset: Number of violations to skip for pagination
        
    Returns:
        QualityViolationsResponse with filtered violations
    """

Example

from graphora.models import QualitySeverity, QualityRuleType

# Get only error violations
error_violations = client.get_quality_violations(
    transform_id="your-transform-id",
    severity=QualitySeverity.ERROR,
    limit=50
)

print(f"Found {error_violations.total_returned} error violations:")
for violation in error_violations.violations:
    print(f"  {violation.message}")
    print(f"    Entity: {violation.entity_type} (ID: {violation.entity_id})")
    if violation.property_name:
        print(f"    Property: {violation.property_name}")
    print(f"    Expected: {violation.expected}")
    print(f"    Actual: {violation.actual}")
    if violation.suggestion:
        print(f"    Suggestion: {violation.suggestion}")

# Get format violations for Company entities
format_violations = client.get_quality_violations(
    transform_id="your-transform-id",
    violation_type=QualityRuleType.FORMAT,
    entity_type="Company",
    limit=25
)

get_quality_summary

Get summary of recent quality results for the user.
def get_quality_summary(self, limit: int = 10) -> QualitySummaryResponse:
    """
    Get summary of recent quality results for the user.
    
    Args:
        limit: Number of recent results to return (1-50)
        
    Returns:
        QualitySummaryResponse with recent quality result summaries
    """

Example

# Get recent quality results
summary = client.get_quality_summary(limit=5)

print(f"Recent quality results: {summary.total_returned}")
for i, result in enumerate(summary.recent_quality_results, 1):
    print(f"{i}. Transform: {result.get('transform_id', 'N/A')}")
    print(f"   Score: {result.get('overall_score', 'N/A')}")
    print(f"   Grade: {result.get('grade', 'N/A')}")
    print(f"   Status: {result.get('status', 'N/A')}")

delete_quality_results

Delete quality validation results for a transform.
def delete_quality_results(self, transform_id: str) -> QualityDeleteResponse:
    """
    Delete quality validation results for a transform.
    
    Args:
        transform_id: ID of the transformation to delete results for
        
    Returns:
        QualityDeleteResponse confirming the deletion
    """

Example

# Delete quality results
deletion = client.delete_quality_results("your-transform-id")
print(f"Deletion result: {deletion.message}")

get_quality_health

Check the health and availability of the quality validation API.
def get_quality_health(self) -> QualityHealthResponse:
    """
    Check the health and availability of the quality validation API.
    
    Returns:
        QualityHealthResponse with health status information
    """

Example

# Check quality API health
health = client.get_quality_health()
print(f"Quality API Status: {health.status}")
print(f"Available: {health.quality_api_available}")
print(f"Message: {health.message}")

if not health.quality_api_available:
    print("Quality validation is not available!")

Quality Workflow Example

Here’s a complete example showing a typical quality validation workflow:
from graphora import GraphoraClient
from graphora.models import QualitySeverity
from graphora.exceptions import GraphoraAPIError

client = GraphoraClient(
    base_url="https://api.graphora.io",
    user_id="user-123",
    api_key="your-api-key"
)

try:
    # 1. Check if quality API is available
    health = client.get_quality_health()
    if not health.quality_api_available:
        print("Quality validation not available")
        exit(1)
    
    # 2. Get quality results
    transform_id = "your-transform-id"
    results = client.get_quality_results(transform_id)
    
    print(f"Quality Score: {results.overall_score:.1f}/100")
    print(f"Grade: {results.grade}")
    print(f"Total Violations: {len(results.violations)}")
    
    # 3. Check for critical errors
    error_violations = client.get_quality_violations(
        transform_id=transform_id,
        severity=QualitySeverity.ERROR,
        limit=10
    )
    
    # 4. Make decision based on quality
    if results.overall_score >= 90 and error_violations.total_returned == 0:
        # High quality - auto approve
        approval = client.approve_quality_results(
            transform_id=transform_id,
            approval_comment="High quality score - auto approved"
        )
        print(f"✅ {approval.message}")
        
    elif results.overall_score >= 70:
        # Moderate quality - manual review
        print("⚠️ Moderate quality - requires manual review")
        
        # Show top violations for review
        for violation in error_violations.violations[:5]:
            print(f"❌ {violation.message}")
            print(f"   Entity: {violation.entity_type}.{violation.property_name}")
        
        # In a real application, you'd prompt for user decision
        # For this example, we'll approve with a comment
        approval = client.approve_quality_results(
            transform_id=transform_id,
            approval_comment=f"Manually approved despite {error_violations.total_returned} errors"
        )
        print(f"✅ {approval.message}")
        
    else:
        # Low quality - reject
        rejection = client.reject_quality_results(
            transform_id=transform_id,
            rejection_reason=f"Quality too low: {results.overall_score:.1f}/100 with {error_violations.total_returned} errors"
        )
        print(f"❌ {rejection.message}")
        print(f"Reason: {rejection.reason}")

except GraphoraAPIError as e:
    print(f"API Error: {e.message} (Status: {e.status_code})")
except Exception as e:
    print(f"Error: {str(e)}")

Error Handling

The GraphoraClient methods can raise the following exceptions:
  • GraphoraAPIError: Raised when the API returns an error
  • GraphoraClientError: Raised for client-side errors
from graphora import GraphoraClient
from graphora.exceptions import GraphoraAPIError, GraphoraClientError

client = GraphoraClient(base_url="https://api.graphora.io")

try:
    ontology_response = client.register_ontology("invalid yaml")
except GraphoraAPIError as e:
    print(f"API Error (Status {e.status_code}): {str(e)}")
except GraphoraClientError as e:
    print(f"Client Error: {str(e)}")
except Exception as e:
    print(f"Unexpected error: {str(e)}")

Dashboard Methods

get_dashboard_summary

Retrieve the KPI snapshot used at the top of the dashboard.
from graphora import GraphoraClient

client = GraphoraClient(base_url="https://api.graphora.io", auth_token="...")
summary = client.get_dashboard_summary(days=14)
print(summary.total_runs, summary.average_duration_ms)

get_dashboard_runs

Fetch recent transform runs with aggregated quality and token information.
runs = client.get_dashboard_runs(limit=10)
for run in runs.runs:
    print(run.document_name, run.quality_score, run.llm_usage.total_tokens)

get_dashboard_performance

Retrieve time-series performance metrics (duration, token usage, cost) for charting.
performance = client.get_dashboard_performance(days=7)
for point in performance.timeseries:
    print(point.date, point.average_duration_ms)

get_dashboard_quality

Access aggregated quality stats including rule frequencies and entity coverage.
quality = client.get_dashboard_quality(days=30)
print(quality.average_score, quality.top_rules[:3])

Next Steps