To use the Graphora API, you’ll need an API key and a user ID. The user ID is required to scope all API operations to your account. You can provide these credentials in several ways:
An ontology defines the structure of your knowledge graph, including entity types and their relationships.
Copy
from graphora import GraphoraClient# Initialize client with user ID (required)client = GraphoraClient( base_url="https://api.graphora.io", user_id="your-user-id", # Required for all API calls api_key="your-api-key" # Or set GRAPHORA_API_KEY environment variable)# Load ontology from a filewith open("ontology.yaml", "r") as f: ontology_yaml = f.read()# Register and validate the ontologyontology_response = client.register_ontology(ontology_yaml)ontology_id = ontology_response.idprint(f"Ontology ID: {ontology_id}")
# Get the merged graphmerged_graph = client.get_merged_graph(merge_id=merge_id, transform_id=transform_id)print(f"Merged graph has {len(merged_graph.nodes)} nodes and {len(merged_graph.edges)} edges")
Here’s a complete example that ties all these steps together:
Copy
from graphora import GraphoraClientimport time# Initialize client with user ID (required)client = GraphoraClient( base_url="https://api.graphora.io", user_id="your-user-id", # Required: User ID for all API calls api_key="your-api-key" # Or set GRAPHORA_API_KEY environment variable)# Load and upload ontologywith open("ontology.yaml", "r") as f: ontology_yaml = f.read()ontology_response = client.register_ontology(ontology_yaml)ontology_id = ontology_response.idprint(f"Ontology ID: {ontology_id}")# Upload documents for processingtransform_response = client.transform( ontology_id=ontology_id, files=["document1.pdf", "document2.txt"])transform_id = transform_response.idprint(f"Transform ID: {transform_id}")# Wait for transformation to completefinal_status = client.wait_for_transform(transform_id)print(f"Transformation completed with status: {final_status.status}")# Get the transformed graphgraph = client.get_transformed_graph(transform_id=transform_id)print(f"Graph has {len(graph.nodes)} nodes and {len(graph.edges)} edges")# Start merge processmerge_response = client.start_merge( session_id=ontology_id, transform_id=transform_id)merge_id = merge_response.merge_idprint(f"Merge ID: {merge_id}")# Wait for merge to complete (simple polling)for _ in range(30): # Try for 5 minutes status = client.get_merge_status(merge_id) if status.status in ["COMPLETED", "FAILED"]: break print(f"Merge status: {status.status}, Progress: {status.progress:.2%}") time.sleep(10)# Get conflicts if anyconflicts = client.get_conflicts(merge_id)if conflicts: print(f"Found {len(conflicts)} conflicts") # Resolve conflicts for conflict in conflicts: client.resolve_conflict( merge_id=merge_id, conflict_id=conflict.id, changed_props={}, resolution="accept", learning_comment="Accepting this entity" )# Get the final merged graphmerged_graph = client.get_merged_graph(merge_id=merge_id, transform_id=transform_id)print(f"Final merged graph has {len(merged_graph.nodes)} nodes and {len(merged_graph.edges)} edges")