Skip to main content

Overview

This page gives you a working Python integration. Each function maps to one ION operation you can lift into your own codebase. By the end you have:
  • A reusable IonClient that handles auth, query and mutation calls, and error parsing.
  • Working examples for four common starting tasks: list parts, create a run, submit a run step result, and upload a file attachment.
  • Patterns for fragments, error handling, and retry with backoff.
The script needs two environment variables:
export ION_BASE_URL="https://api.firstresonance.io"
export ION_TOKEN="<your-access-token>"
If you don’t have a token yet, see Getting a token.

Set up the environment

python -m venv .venv
source .venv/bin/activate
pip install "httpx>=0.27" "tenacity>=8.0"
You don’t need a GraphQL-specific client. httpx handles the transport. tenacity gives you declarative retry. If you prefer the gql library, these patterns translate directly.

Build the client

import os
import httpx
from typing import Any
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class IonError(RuntimeError):
    """Raised when ION returns errors[] in the response body."""
    def __init__(self, errors: list[dict[str, Any]]):
        self.errors = errors
        super().__init__("; ".join(e.get("message", "") for e in errors))


class IonClient:
    def __init__(self, base_url: str | None = None, token: str | None = None):
        self.base_url = (base_url or os.environ["ION_BASE_URL"]).rstrip("/")
        self.token = token or os.environ["ION_TOKEN"]
        self._http = httpx.Client(
            base_url=self.base_url,
            headers={"Authorization": f"Bearer {self.token}"},
            timeout=30.0,
        )

    @retry(
        retry=retry_if_exception_type((httpx.HTTPError,)),
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        reraise=True,
    )
    def _post_graphql(self, query: str, variables: dict[str, Any] | None = None) -> dict[str, Any]:
        r = self._http.post(
            "/graphql",
            json={"query": query, "variables": variables or {}},
        )
        r.raise_for_status()
        payload = r.json()
        if payload.get("errors"):
            raise IonError(payload["errors"])
        return payload["data"]

    def query(self, query: str, **variables) -> dict[str, Any]:
        return self._post_graphql(query, variables)

    def mutate(self, mutation: str, **variables) -> dict[str, Any]:
        return self._post_graphql(mutation, variables)

    def close(self):
        self._http.close()
You only call IonClient.query and IonClient.mutate. _post_graphql handles transport-level retries for 5xx and network errors. It surfaces GraphQL-level errors as IonError.

Confirm the token works

Check who you are authenticated as before you do anything else:
ME = """
query CurrentUser {
  me {
    id
    name
    email
    organization { id domain }
  }
}
"""

with IonClient() as ion:
    me = ion.query(ME)["me"]
    print(f"Authenticated as {me['name']} ({me['email']}) in org {me['organization']['domain']}")
If this fails, see the 401 table in Error codes. The error message names the fix.

List parts

LIST_PARTS = """
query Parts($filters: PartsInputFilters, $first: Int) {
  parts(filters: $filters, first: $first) {
    edges {
      node {
        id
        partNumber
        description
        revision
        partType
        status
        partSubtypes { id name }
      }
    }
  }
}
"""

with IonClient() as ion:
    edges = ion.query(LIST_PARTS, filters={}, first=20)["parts"]["edges"]
    for edge in edges:
        p = edge["node"]
        subtypes = ", ".join(s["name"] for s in p["partSubtypes"]) or "—"
        print(f"{p['partNumber']}\t{p['description']}\t{subtypes}")

Create a run

CREATE_RUN = """
mutation CreateRun($input: CreateRunInput!) {
  createRun(input: $input) {
    run {
      id
      title
      status
      procedure { id title }
      partInventory { id serialNumber }
    }
  }
}
"""

with IonClient() as ion:
    new_run = ion.mutate(
        CREATE_RUN,
        input={
            "procedureId": 12,
            "partInventoryId": 9876,
            "title": "Build #2026-04-26 Unit 1",
        },
    )["createRun"]["run"]
    print(f"Created run {new_run['id']}: {new_run['title']}")
Replace procedureId and partInventoryId with real IDs from your org. To find candidate procedures, see Example requests.

Submit a run step result

SUBMIT_STEP = """
mutation SubmitStep($input: UpdateRunStepInput!) {
  updateRunStep(input: $input) {
    runStep {
      id
      status
      endTime
      fields { id name value }
    }
  }
}
"""

with IonClient() as ion:
    result = ion.mutate(
        SUBMIT_STEP,
        input={
            "id": 4567,
            "etag": "abc123",        # required for optimistic concurrency
            "status": "COMPLETE",
        },
    )["updateRunStep"]["runStep"]
    print(f"Step {result['id']}{result['status']} at {result['endTime']}")
The etag field is required on update. For details, see Etag-based concurrency. If you get a 409, re-fetch the run step, take its new etag, and retry.

Upload a file attachment

This is the full three-step pattern from File upload:
import os

REQUEST_UPLOAD = """
query Upload($filename: String!) {
  requestFileUploadUrl(filename: $filename) {
    uploadUrl
    s3Key
    contentType
  }
}
"""

GET_RUN_STEP_ENTITY = """
query StepEntity($id: ID!) {
  runStep(id: $id) {
    entity { id }
  }
}
"""

CREATE_FILE_ATTACHMENT = """
mutation CreateAttachment($input: CreateFileAttachmentInput!) {
  createFileAttachment(input: $input) {
    fileAttachment { id filename downloadUrl }
  }
}
"""

def attach_to_run_step(ion: IonClient, run_step_id: int, file_path: str) -> int:
    filename = os.path.basename(file_path)

    # 1. Get a signed upload URL
    upload = ion.query(REQUEST_UPLOAD, filename=filename)["requestFileUploadUrl"]

    # 2. PUT the file to S3, capture the ETag
    with open(file_path, "rb") as f:
        s3_response = httpx.put(
            upload["uploadUrl"],
            content=f.read(),
            headers={"Content-Type": upload["contentType"]},
            timeout=120.0,
        )
    s3_response.raise_for_status()
    s3_etag = s3_response.headers["ETag"].strip('"')

    # 3. Resolve the run step's entity.id (NOT the run step's own id)
    entity_id = ion.query(GET_RUN_STEP_ENTITY, id=run_step_id)["runStep"]["entity"]["id"]

    # 4. Register the attachment
    attachment = ion.mutate(
        CREATE_FILE_ATTACHMENT,
        input={
            "filename": filename,
            "s3Key": upload["s3Key"],
            "s3Etag": s3_etag,
            "entityId": entity_id,
            "validateUpload": True,
        },
    )["createFileAttachment"]["fileAttachment"]
    return attachment["id"]


with IonClient() as ion:
    attachment_id = attach_to_run_step(ion, run_step_id=4567, file_path="./inspection.jpg")
    print(f"Attached file id={attachment_id}")
Two parts of this pattern are easy to miss. You must capture the S3 ETag. You must fetch entity.id, not the parent’s id. Both are documented in File upload.

Error handling pattern

The IonClient raises IonError on GraphQL-level errors. It uses tenacity for transport retries. Wrap your business logic in a try and except block that handles each error type:
from httpx import HTTPStatusError, ConnectError

def safely_create_run(ion: IonClient, **inputs) -> int | None:
    try:
        result = ion.mutate(CREATE_RUN, input=inputs)
    except IonError as e:
        # GraphQL errors[] — usually 4xx auth/permission/validation
        # Surface to the user; don't retry blindly
        print(f"ION rejected the request: {e}")
        return None
    except HTTPStatusError as e:
        # 5xx after retries exhausted
        print(f"ION 5xx after retries: {e.response.status_code}")
        return None
    except ConnectError:
        # Network down
        print("Cannot reach ION")
        return None
    return result["createRun"]["run"]["id"]
For the full HTTP status reference and the GraphQL errors[] payload shape, see Error codes.

Reuse fragments

When several queries return the same entity, such as parts, runs, or purchase orders, define one fragment per entity and inline it everywhere:
FRAGMENTS = """
fragment PartFields on Part {
  id
  partNumber
  description
  revision
  partType
  status
  partSubtypes { id name }
}

fragment RunFields on Run {
  id
  title
  status
  procedure { id title }
  partInventory { id serialNumber part { ...PartFields } }
}
"""

LIST_RUNS_WITH_PARTS = FRAGMENTS + """
query Runs($filters: RunsInputFilters) {
  runs(filters: $filters) {
    edges {
      node {
        ...RunFields
      }
    }
  }
}
"""
This keeps field selection consistent across calls. A forgotten field, such as _etag, becomes a one-line fix instead of a multi-file refactor.