Model Integration

HiveTraceRed supports multiple LLM providers and makes it easy to add new ones. This guide shows how to use built-in models and create custom integrations.

Supported Models

Built-in Provider Support

HiveTraceRed provides the following model classes for various LLM providers:

  • OpenAIModel: OpenAI models

  • GigaChatModel: Sber’s GigaChat models

  • YandexGPTModel: Yandex GPT models

  • GeminiNativeModel: Google Gemini models

  • OpenRouterModel: Access to multiple model providers

  • CloudRuModel: Cloud.ru ML models

  • OllamaModel: Local models via Ollama server

  • LlamaCppModel: Local GGUF models via llama.cpp (direct inference)

Using Built-in Models

OpenAI Models

from hivetracered.models import OpenAIModel

# Basic usage
model = OpenAIModel(model="gpt-4.1")

# With parameters
model = OpenAIModel(
    model="gpt-4.1",
    temperature=0.7,
    max_tokens=1000
)

# Synchronous call
response = model.invoke("Hello")
print(response['content'])

# Asynchronous call
import asyncio
response = await model.ainvoke("Hello")

GigaChat Models

from hivetracered.models import GigaChatModel

model = GigaChatModel(
    model="gigachat",
    credentials="YOUR_CREDENTIALS",
    verify_ssl_certs=False
)

response = await model.ainvoke("Привет, как дела?")

Yandex Models

from hivetracered.models import YandexGPTModel

model = YandexGPTModel(
    model="yandexgpt-lite",
    folder_id="YOUR_FOLDER_ID",
    api_key="YOUR_API_KEY"
)

response = await model.ainvoke("Расскажи о Python")

Google Gemini Models

from hivetracered.models import GeminiNativeModel

model = GeminiNativeModel(
    model="gemini-2.5-flash-preview-04-17",
    api_key="YOUR_API_KEY"
)

response = await model.ainvoke("Explain quantum computing")

OpenRouter

from hivetracered.models import OpenRouterModel

model = OpenRouterModel(
    model="openai/gpt-4.1",
    api_key="YOUR_OPENROUTER_KEY"
)

response = await model.ainvoke("Tell me a joke")

Ollama Models

from hivetracered.models import OllamaModel

# Requires Ollama server running (https://ollama.com)
model = OllamaModel(
    model="qwen3:0.6b",
    base_url="http://localhost:11434"
)

response = await model.ainvoke("Explain quantum computing")

Llama.cpp Models (Local GGUF)

from hivetracered.models import LlamaCppModel

# CPU-only inference
model = LlamaCppModel(
    model_path="/path/to/model.gguf",
    n_ctx=8192,
    max_concurrency=5
)

# GPU-accelerated inference
model = LlamaCppModel(
    model_path="/path/to/llama-3.2-8b-instruct-q5.gguf",
    n_gpu_layers=-1,  # Auto-detect and use all GPU layers
    n_ctx=16384,
    temperature=0.7,
    max_tokens=2048
)

response = await model.ainvoke("Explain machine learning")

Installation for GPU acceleration:

# NVIDIA GPU (CUDA)
CMAKE_ARGS="-DGGML_CUDA=on" pip install llama-cpp-python

# Apple Silicon (Metal)
CMAKE_ARGS="-DGGML_METAL=on" pip install llama-cpp-python

Download GGUF models:

GGUF models are available on Hugging Face: https://huggingface.co/models?library=gguf

Popular model families include Llama, Mistral, Qwen, Phi, and more in various quantization levels (Q4, Q5, Q8).

Model Interface

All models implement the Model base class with these methods:

Synchronous Methods

# Single request
response = model.invoke(prompt)

# Batch requests (max_concurrency is set when initializing the model)
responses = model.batch(prompts)

Asynchronous Methods

# Single request
response = await model.ainvoke(prompt)

# Batch requests (max_concurrency is set when initializing the model)
responses = await model.abatch(prompts)

# Streaming batch (uses model's max_concurrency setting)
async for response in model.stream_abatch(prompts):
    print(response)

Message Formats

String Format

response = await model.ainvoke("Hello")

Message List Format

messages = [
    {"role": "system", "content": "You are a helpful assistant"},
    {"role": "user", "content": "Hello"}
]
response = await model.ainvoke(messages)

Response Format

All models return a dictionary:

{
    "content": "The model's response text",
    "response_metadata": {
        "model_name": "gpt-4.1",
        "finish_reason": "stop",
        # Additional provider-specific fields
    }
}

Creating Custom Models

To integrate a new LLM provider, inherit from Model base class.

Basic Custom Model

from hivetracered.models import Model
from typing import Union, List, Dict
import asyncio

class MyCustomModel(Model):
    def __init__(self, model: str, api_key: str, max_concurrency: int = 10, **kwargs):
        self.model_name = model
        self.api_key = api_key
        self.max_concurrency = max_concurrency
        self.params = kwargs

    def invoke(self, prompt: Union[str, List[Dict]]) -> dict:
        """Synchronous single request"""
        # Your implementation
        response_text = self._call_api(prompt)
        return {
            "content": response_text,
            "model": self.model_name
        }

    async def ainvoke(self, prompt: Union[str, List[Dict]]) -> dict:
        """Asynchronous single request"""
        # Your async implementation
        response_text = await self._async_call_api(prompt)
        return {
            "content": response_text,
            "model": self.model_name
        }

    def batch(self, prompts: List) -> List[dict]:
        """Synchronous batch processing"""
        return [self.invoke(p) for p in prompts]

    async def abatch(self, prompts: List) -> List[dict]:
        """Asynchronous batch processing"""
        # Process in chunks based on max_concurrency
        results = []
        for i in range(0, len(prompts), self.max_concurrency):
            batch = prompts[i:i + self.max_concurrency]
            tasks = [self.ainvoke(p) for p in batch]
            results.extend(await asyncio.gather(*tasks))
        return results

    async def stream_abatch(self, prompts: List):
        """Stream results as they complete"""
        for i in range(0, len(prompts), self.max_concurrency):
            batch = prompts[i:i + self.max_concurrency]
            responses = await self.abatch(batch)
            for response in responses:
                yield response

    def _call_api(self, prompt):
        """Your API call implementation"""
        pass

    async def _async_call_api(self, prompt):
        """Your async API call implementation"""
        pass

Advanced Custom Model

from hivetracered.models import Model
import aiohttp

class AdvancedCustomModel(Model):
    def __init__(self, model: str, api_url: str, api_key: str, **kwargs):
        self.model_name = model
        self.api_url = api_url
        self.api_key = api_key
        self.temperature = kwargs.get('temperature', 0.7)
        self.max_tokens = kwargs.get('max_tokens', 1000)

    async def ainvoke(self, prompt: Union[str, List[Dict]]) -> dict:
        # Convert prompt to provider format
        formatted_prompt = self._format_prompt(prompt)

        # Make API call
        async with aiohttp.ClientSession() as session:
            headers = {"Authorization": f"Bearer {self.api_key}"}
            payload = {
                "model": self.model_name,
                "messages": formatted_prompt,
                "temperature": self.temperature,
                "max_tokens": self.max_tokens
            }

            async with session.post(
                self.api_url,
                json=payload,
                headers=headers
            ) as response:
                data = await response.json()
                return self._parse_response(data)

    def _format_prompt(self, prompt):
        """Convert to provider's format"""
        if isinstance(prompt, str):
            return [{"role": "user", "content": prompt}]
        return prompt

    def _parse_response(self, data):
        """Extract content from provider's response"""
        return {
            "content": data['choices'][0]['message']['content'],
            "model": self.model_name,
            "finish_reason": data['choices'][0]['finish_reason']
        }

    def is_answer_blocked(self, answer: dict) -> bool:
        """Check if response was blocked by safety filters"""
        return answer.get('finish_reason') == 'content_filter'

    def invoke(self, prompt):
        """Sync wrapper"""
        import asyncio
        return asyncio.run(self.ainvoke(prompt))

    # Implement other required methods...

Safety Filters

Detecting Blocked Responses

Override is_answer_blocked to detect safety filter activations:

class SafetyAwareModel(Model):
    def is_answer_blocked(self, answer: dict) -> bool:
        # Check for safety filter indicators
        if answer.get('finish_reason') == 'content_filter':
            return True
        if 'blocked' in answer.get('content', '').lower():
            return True
        return False

This is used by the pipeline to track successful jailbreaks.

Error Handling

Implement robust error handling:

async def ainvoke(self, prompt):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            return await self._call_api(prompt)
        except RateLimitError:
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise
        except APIError as e:
            logger.error(f"API error: {e}")
            raise

Registering Custom Models

Add to the model registry for use in configuration files:

# In pipeline/constants.py
from hivetracered.models.my_custom_model import MyCustomModel

MODEL_CLASSES = {
    "my-custom-model": MyCustomModel,
    "gpt-4.1": OpenAIModel,
    # ... other models
}

Then use in configuration:

response_model:
  name: my-custom-model
  params:
    api_key: YOUR_KEY
    custom_param: value

Web-Based Models (Browser Automation)

For AI models without an official API, you can use the WebModel framework to automate browser interactions using Playwright.

Overview

WebModel enables testing of web-based chat interfaces:

  • Chat interfaces without public APIs

  • Custom AI deployments

  • On-premise models with web UIs only

Quick Example

from hivetracered.models import MistralWebModel

# Mistral Le Chat example (built-in)
model = MistralWebModel(
    model="mistral-large",
    headless=False,  # Show browser for debugging
    max_concurrency=2
)

response = model.invoke("Explain quantum computing")
print(response["content"])

model.close()  # Always close browser

Creating Custom Web Models

Use the Web Action Recorder to identify UI elements, then create your model:

from hivetracered.models.web_model import WebModel
from playwright.async_api import Page

class MyWebModel(WebModel):
    def __init__(self, **kwargs):
        super().__init__(model="my-chat", **kwargs)
        self.target_url = "https://your-chat.com"

    async def _send_message_and_get_response(
        self, page: Page, message: str
    ) -> str:
        # Find input, send message, wait for response
        input_elem = await page.wait_for_selector('textarea#chat-input')
        await input_elem.type(message)
        await page.keyboard.press('Enter')

        # Wait for stable response
        return await self._wait_for_stable_response(
            page,
            'div.response-message'
        )

See the Web-Based Models guide for detailed instructions on:

  • Using the Web Action Recorder to capture UI interactions

  • Creating custom web model implementations

  • Handling consent dialogs, login flows, and streaming responses

  • Best practices for browser automation

Best Practices

  1. Handle Both Sync and Async: Implement both invoke and ainvoke

  2. Support Message Formats: Handle both string and message list inputs

  3. Implement Batching: Use batching for efficiency

  4. Add Error Handling: Implement retries and proper error messages

  5. Detect Safety Filters: Override is_answer_blocked appropriately

See Also