Model Integration¶
HiveTraceRed supports multiple LLM providers and makes it easy to add new ones. This guide shows how to use built-in models and create custom integrations.
Supported Models¶
Built-in Provider Support¶
HiveTraceRed provides the following model classes for various LLM providers:
OpenAIModel: OpenAI models
GigaChatModel: Sber’s GigaChat models
YandexGPTModel: Yandex GPT models
GeminiNativeModel: Google Gemini models
OpenRouterModel: Access to multiple model providers
CloudRuModel: Cloud.ru ML models
OllamaModel: Local models via Ollama server
LlamaCppModel: Local GGUF models via llama.cpp (direct inference)
Using Built-in Models¶
OpenAI Models¶
from hivetracered.models import OpenAIModel
# Basic usage
model = OpenAIModel(model="gpt-4.1")
# With parameters
model = OpenAIModel(
model="gpt-4.1",
temperature=0.7,
max_tokens=1000
)
# Synchronous call
response = model.invoke("Hello")
print(response['content'])
# Asynchronous call
import asyncio
response = await model.ainvoke("Hello")
GigaChat Models¶
from hivetracered.models import GigaChatModel
model = GigaChatModel(
model="gigachat",
credentials="YOUR_CREDENTIALS",
verify_ssl_certs=False
)
response = await model.ainvoke("Привет, как дела?")
Yandex Models¶
from hivetracered.models import YandexGPTModel
model = YandexGPTModel(
model="yandexgpt-lite",
folder_id="YOUR_FOLDER_ID",
api_key="YOUR_API_KEY"
)
response = await model.ainvoke("Расскажи о Python")
Google Gemini Models¶
from hivetracered.models import GeminiNativeModel
model = GeminiNativeModel(
model="gemini-2.5-flash-preview-04-17",
api_key="YOUR_API_KEY"
)
response = await model.ainvoke("Explain quantum computing")
OpenRouter¶
from hivetracered.models import OpenRouterModel
model = OpenRouterModel(
model="openai/gpt-4.1",
api_key="YOUR_OPENROUTER_KEY"
)
response = await model.ainvoke("Tell me a joke")
Ollama Models¶
from hivetracered.models import OllamaModel
# Requires Ollama server running (https://ollama.com)
model = OllamaModel(
model="qwen3:0.6b",
base_url="http://localhost:11434"
)
response = await model.ainvoke("Explain quantum computing")
Llama.cpp Models (Local GGUF)¶
from hivetracered.models import LlamaCppModel
# CPU-only inference
model = LlamaCppModel(
model_path="/path/to/model.gguf",
n_ctx=8192,
max_concurrency=5
)
# GPU-accelerated inference
model = LlamaCppModel(
model_path="/path/to/llama-3.2-8b-instruct-q5.gguf",
n_gpu_layers=-1, # Auto-detect and use all GPU layers
n_ctx=16384,
temperature=0.7,
max_tokens=2048
)
response = await model.ainvoke("Explain machine learning")
Installation for GPU acceleration:
# NVIDIA GPU (CUDA)
CMAKE_ARGS="-DGGML_CUDA=on" pip install llama-cpp-python
# Apple Silicon (Metal)
CMAKE_ARGS="-DGGML_METAL=on" pip install llama-cpp-python
Download GGUF models:
GGUF models are available on Hugging Face: https://huggingface.co/models?library=gguf
Popular model families include Llama, Mistral, Qwen, Phi, and more in various quantization levels (Q4, Q5, Q8).
Model Interface¶
All models implement the Model base class with these methods:
Synchronous Methods¶
# Single request
response = model.invoke(prompt)
# Batch requests (max_concurrency is set when initializing the model)
responses = model.batch(prompts)
Asynchronous Methods¶
# Single request
response = await model.ainvoke(prompt)
# Batch requests (max_concurrency is set when initializing the model)
responses = await model.abatch(prompts)
# Streaming batch (uses model's max_concurrency setting)
async for response in model.stream_abatch(prompts):
print(response)
Message Formats¶
String Format¶
response = await model.ainvoke("Hello")
Message List Format¶
messages = [
{"role": "system", "content": "You are a helpful assistant"},
{"role": "user", "content": "Hello"}
]
response = await model.ainvoke(messages)
Response Format¶
All models return a dictionary:
{
"content": "The model's response text",
"response_metadata": {
"model_name": "gpt-4.1",
"finish_reason": "stop",
# Additional provider-specific fields
}
}
Creating Custom Models¶
To integrate a new LLM provider, inherit from Model base class.
Basic Custom Model¶
from hivetracered.models import Model
from typing import Union, List, Dict
import asyncio
class MyCustomModel(Model):
def __init__(self, model: str, api_key: str, max_concurrency: int = 10, **kwargs):
self.model_name = model
self.api_key = api_key
self.max_concurrency = max_concurrency
self.params = kwargs
def invoke(self, prompt: Union[str, List[Dict]]) -> dict:
"""Synchronous single request"""
# Your implementation
response_text = self._call_api(prompt)
return {
"content": response_text,
"model": self.model_name
}
async def ainvoke(self, prompt: Union[str, List[Dict]]) -> dict:
"""Asynchronous single request"""
# Your async implementation
response_text = await self._async_call_api(prompt)
return {
"content": response_text,
"model": self.model_name
}
def batch(self, prompts: List) -> List[dict]:
"""Synchronous batch processing"""
return [self.invoke(p) for p in prompts]
async def abatch(self, prompts: List) -> List[dict]:
"""Asynchronous batch processing"""
# Process in chunks based on max_concurrency
results = []
for i in range(0, len(prompts), self.max_concurrency):
batch = prompts[i:i + self.max_concurrency]
tasks = [self.ainvoke(p) for p in batch]
results.extend(await asyncio.gather(*tasks))
return results
async def stream_abatch(self, prompts: List):
"""Stream results as they complete"""
for i in range(0, len(prompts), self.max_concurrency):
batch = prompts[i:i + self.max_concurrency]
responses = await self.abatch(batch)
for response in responses:
yield response
def _call_api(self, prompt):
"""Your API call implementation"""
pass
async def _async_call_api(self, prompt):
"""Your async API call implementation"""
pass
Advanced Custom Model¶
from hivetracered.models import Model
import aiohttp
class AdvancedCustomModel(Model):
def __init__(self, model: str, api_url: str, api_key: str, **kwargs):
self.model_name = model
self.api_url = api_url
self.api_key = api_key
self.temperature = kwargs.get('temperature', 0.7)
self.max_tokens = kwargs.get('max_tokens', 1000)
async def ainvoke(self, prompt: Union[str, List[Dict]]) -> dict:
# Convert prompt to provider format
formatted_prompt = self._format_prompt(prompt)
# Make API call
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {self.api_key}"}
payload = {
"model": self.model_name,
"messages": formatted_prompt,
"temperature": self.temperature,
"max_tokens": self.max_tokens
}
async with session.post(
self.api_url,
json=payload,
headers=headers
) as response:
data = await response.json()
return self._parse_response(data)
def _format_prompt(self, prompt):
"""Convert to provider's format"""
if isinstance(prompt, str):
return [{"role": "user", "content": prompt}]
return prompt
def _parse_response(self, data):
"""Extract content from provider's response"""
return {
"content": data['choices'][0]['message']['content'],
"model": self.model_name,
"finish_reason": data['choices'][0]['finish_reason']
}
def is_answer_blocked(self, answer: dict) -> bool:
"""Check if response was blocked by safety filters"""
return answer.get('finish_reason') == 'content_filter'
def invoke(self, prompt):
"""Sync wrapper"""
import asyncio
return asyncio.run(self.ainvoke(prompt))
# Implement other required methods...
Safety Filters¶
Detecting Blocked Responses¶
Override is_answer_blocked to detect safety filter activations:
class SafetyAwareModel(Model):
def is_answer_blocked(self, answer: dict) -> bool:
# Check for safety filter indicators
if answer.get('finish_reason') == 'content_filter':
return True
if 'blocked' in answer.get('content', '').lower():
return True
return False
This is used by the pipeline to track successful jailbreaks.
Error Handling¶
Implement robust error handling:
async def ainvoke(self, prompt):
max_retries = 3
for attempt in range(max_retries):
try:
return await self._call_api(prompt)
except RateLimitError:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
raise
except APIError as e:
logger.error(f"API error: {e}")
raise
Registering Custom Models¶
Add to the model registry for use in configuration files:
# In pipeline/constants.py
from hivetracered.models.my_custom_model import MyCustomModel
MODEL_CLASSES = {
"my-custom-model": MyCustomModel,
"gpt-4.1": OpenAIModel,
# ... other models
}
Then use in configuration:
response_model:
name: my-custom-model
params:
api_key: YOUR_KEY
custom_param: value
Web-Based Models (Browser Automation)¶
For AI models without an official API, you can use the WebModel framework to automate browser interactions using Playwright.
Overview¶
WebModel enables testing of web-based chat interfaces:
Chat interfaces without public APIs
Custom AI deployments
On-premise models with web UIs only
Quick Example¶
from hivetracered.models import MistralWebModel
# Mistral Le Chat example (built-in)
model = MistralWebModel(
model="mistral-large",
headless=False, # Show browser for debugging
max_concurrency=2
)
response = model.invoke("Explain quantum computing")
print(response["content"])
model.close() # Always close browser
Creating Custom Web Models¶
Use the Web Action Recorder to identify UI elements, then create your model:
from hivetracered.models.web_model import WebModel
from playwright.async_api import Page
class MyWebModel(WebModel):
def __init__(self, **kwargs):
super().__init__(model="my-chat", **kwargs)
self.target_url = "https://your-chat.com"
async def _send_message_and_get_response(
self, page: Page, message: str
) -> str:
# Find input, send message, wait for response
input_elem = await page.wait_for_selector('textarea#chat-input')
await input_elem.type(message)
await page.keyboard.press('Enter')
# Wait for stable response
return await self._wait_for_stable_response(
page,
'div.response-message'
)
See the Web-Based Models guide for detailed instructions on:
Using the Web Action Recorder to capture UI interactions
Creating custom web model implementations
Handling consent dialogs, login flows, and streaming responses
Best practices for browser automation
Best Practices¶
Handle Both Sync and Async: Implement both
invokeandainvokeSupport Message Formats: Handle both string and message list inputs
Implement Batching: Use batching for efficiency
Add Error Handling: Implement retries and proper error messages
Detect Safety Filters: Override
is_answer_blockedappropriately
See Also¶
Models API - API documentation
Running the Pipeline - Pipeline usage
Web-Based Models - Web-based models and action recorder guide