Python Client
Official SDK
v0.1.0
Python 3.8+
Getting Started
Installation
Using pip:
pip install solidb
Using Poetry:
poetry add solidb
Requirements: Python 3.8 or higher. The client uses msgpack for binary serialization.
Quick Start
from solidb import Client
# Create client instance
client = Client("127.0.0.1", 6745)
# Establish connection
client.connect()
# Authenticate
client.auth("_system", "admin", "password")
# Set database context (required for sub-clients)
client.use_database("mydb")
# Basic CRUD operations
doc = client.insert("mydb", "users", {"name": "Alice", "age": 30})
print(f"Created: {doc['_key']}")
user = client.get("mydb", "users", doc["_key"])
print(f"Retrieved: {user['name']}")
client.update("mydb", "users", doc["_key"], {"age": 31})
# Query with SDBQL
results = client.query("mydb", """
FOR u IN users
FILTER u.age > @min
RETURN u
""", {"min": 25})
print(f"Found {len(results)} users")
# Use management sub-clients
scripts = client.scripts.list()
triggers = client.triggers.list()
# Clean up
client.close()
Connection Management
# Initialize with host and port
client = Client("127.0.0.1", 6745)
# Connect (establishes TCP socket)
client.connect()
# Check connection health (returns latency in ms)
latency = client.ping()
print(f"Latency: {latency:.2f}ms")
# Close connection when done
client.close()
# Context manager support
with Client("127.0.0.1", 6745) as client:
client.auth("_system", "admin", "password")
# ... operations
# Auto-closes on exit
| Method | Returns | Description |
|---|---|---|
Client(host, port) | Client | Create client instance |
connect() | None | Establish TCP connection |
ping() | float | Latency in milliseconds |
close() | None | Close connection |
use_database(name) | Client | Set database context for sub-clients |
Authentication
# Authenticate with database, username, and password
client.auth("_system", "admin", "password")
# Authentication is required for most operations
# The session remains authenticated until disconnected
Core Operations
Database Operations
# List all databases
databases = client.list_databases()
# => ["_system", "mydb", "testdb"]
# Create a new database
client.create_database("analytics")
# Delete a database
client.delete_database("old_db")
| Method | Returns | Description |
|---|---|---|
list_databases() | List[str] | List all database names |
create_database(name) | None | Create new database |
delete_database(name) | None | Delete database |
Collection Operations
# List collections in a database
collections = client.list_collections("mydb")
# => [{"name": "users", "type": "document"}, ...]
# Create a document collection
client.create_collection("mydb", "products")
# Create an edge collection (for graphs)
client.create_collection("mydb", "relationships", col_type="edge")
# Delete a collection
client.delete_collection("mydb", "old_collection")
| Method | Returns | Description |
|---|---|---|
list_collections(db) | List[Dict] | List collections in database |
create_collection(db, name, col_type=None) | None | Create collection (type: None/edge) |
delete_collection(db, name) | None | Delete collection |
Document Operations (CRUD)
# INSERT - Create a new document
doc = client.insert("mydb", "users", {
"name": "Alice",
"email": "[email protected]",
"age": 30
})
print(doc["_key"]) # Auto-generated key
# INSERT with custom key
doc = client.insert("mydb", "users", {"name": "Bob"}, key="custom-key-123")
# GET - Retrieve a document by key
user = client.get("mydb", "users", "custom-key-123")
# => {"_key": "custom-key-123", "name": "Bob", ...}
# UPDATE - Modify a document (merge by default)
client.update("mydb", "users", "custom-key-123", {"age": 25})
# UPDATE - Replace entire document (merge=False)
client.update("mydb", "users", "custom-key-123", {"name": "Robert"}, merge=False)
# DELETE - Remove a document
client.delete("mydb", "users", "custom-key-123")
# LIST - Paginated document listing
docs = client.list("mydb", "users", limit=50, offset=0)
| Method | Returns | Description |
|---|---|---|
insert(db, col, doc, key=None) | Dict | Insert document, returns doc with _key |
get(db, col, key) | Dict | Get document by key |
update(db, col, key, doc, merge=True) | None | Update document (merge or replace) |
delete(db, col, key) | None | Delete document |
list(db, col, limit=50, offset=0) | List[Dict] | List documents with pagination |
SDBQL Queries
# Simple query
users = client.query("mydb", "FOR u IN users RETURN u")
# Query with bind variables (recommended for security)
results = client.query("mydb", """
FOR u IN users
FILTER u.age >= @min_age AND u.status == @status
SORT u.created_at DESC
LIMIT @limit
RETURN { name: u.name, email: u.email }
""", {
"min_age": 18,
"status": "active",
"limit": 100
})
# Aggregation query
stats = client.query("mydb", """
FOR u IN users
COLLECT status = u.status WITH COUNT INTO count
RETURN { status, count }
""")
# Join query
orders = client.query("mydb", """
FOR o IN orders
FOR u IN users FILTER u._key == o.user_id
RETURN { order: o, user: u.name }
""")
# Explain query plan (for optimization)
plan = client.explain("mydb", "FOR u IN users FILTER u.age > 25 RETURN u")
ACID Transactions
# Begin a transaction
tx_id = client.begin_transaction("mydb", isolation="read_committed")
# Isolation levels: read_uncommitted, read_committed, repeatable_read, serializable
try:
# Perform operations within transaction
client.insert("mydb", "accounts", {"id": 1, "balance": 1000})
client.insert("mydb", "accounts", {"id": 2, "balance": 500})
# Commit if all operations succeed
client.commit_transaction(tx_id)
print("Transaction committed")
except Exception as e:
# Rollback on any error
client.rollback_transaction(tx_id)
print(f"Transaction rolled back: {e}")
| Method | Returns | Description |
|---|---|---|
begin_transaction(db, isolation=None) | str | Start transaction, returns tx_id |
commit_transaction(tx_id) | None | Commit transaction |
rollback_transaction(tx_id) | None | Rollback transaction |
Index Management
# Create an index
client.create_index("mydb", "users", "idx_email", ["email"], unique=True, sparse=False)
# List indexes on a collection
indexes = client.list_indexes("mydb", "users")
# Delete an index
client.delete_index("mydb", "users", "idx_email")
Management Sub-Clients
Sub-clients provide namespaced access to management APIs.
Important: Call use_database(name) first to set the database context.
client.scripts
Lua Script Endpointsclient.use_database("mydb")
# Create a Lua script endpoint
script = client.scripts.create(
name="hello",
path="/api/hello",
methods=["GET", "POST"],
code='return { message = "Hello, " .. (req.params.name or "World") }',
description="Greeting endpoint", # optional
collection="users" # optional: restrict to collection
)
print(f"Created script: {script['_key']}")
# List all scripts
scripts = client.scripts.list()
for s in scripts:
print(f"{s['name']} -> {s['path']}")
# Get a specific script
script = client.scripts.get("script_key")
# Update script code
client.scripts.update("script_key", {
"code": 'return { message = "Updated!" }',
"methods": ["GET"]
})
# Delete a script
client.scripts.delete("script_key")
# Get execution statistics
stats = client.scripts.get_stats()
print(f"Total calls: {stats['total_calls']}")
| Method | Parameters | Description |
|---|---|---|
create | name, path, methods, code, description=, collection= | Create Lua endpoint |
list() | - | List all scripts |
get(script_id) | script_id | Get script details |
update(script_id, updates) | script_id, dict | Update script properties |
delete(script_id) | script_id | Delete script |
get_stats() | - | Execution statistics |
client.jobs & client.cron
Background Processingclient.use_database("mydb")
# === JOBS ===
# List all queues
queues = client.jobs.list_queues()
# => [{"name": "default", "pending": 5, "running": 2}, ...]
# List jobs in a queue with filters
jobs = client.jobs.list_jobs("default",
status="pending", # pending, running, completed, failed
limit=50,
offset=0
)
# Enqueue a new job
job = client.jobs.enqueue("default",
script_path="/scripts/process-order",
params={"order_id": 12345},
priority=10, # optional: higher = more urgent
run_at=None # optional: ISO8601 datetime for delayed execution
)
print(f"Job ID: {job['_key']}")
# Get job details
job = client.jobs.get_job("job_id")
print(f"Status: {job['status']}")
# Cancel a pending job
client.jobs.cancel("job_id")
# === CRON ===
# List scheduled jobs
crons = client.cron.list()
# Create a cron job
cron = client.cron.create(
name="daily-cleanup",
schedule="0 2 * * *", # Every day at 2 AM
script_path="/scripts/cleanup",
params={"days_old": 30}, # optional
description="Remove old records" # optional
)
# Get cron job details
cron = client.cron.get("cron_id")
# Update cron schedule
client.cron.update("cron_id", {"schedule": "0 3 * * *"})
# Delete cron job
client.cron.delete("cron_id")
client.triggers
Database Triggersclient.use_database("mydb")
# List all triggers
triggers = client.triggers.list()
# List triggers for a specific collection
triggers = client.triggers.list_by_collection("users")
# Create a trigger
trigger = client.triggers.create(
name="on_user_created",
collection="users",
operation="insert", # insert, update, delete
script_path="/scripts/on-user-create",
description="Handle new user" # optional
)
# Get trigger details
trigger = client.triggers.get("trigger_id")
# Update trigger
client.triggers.update("trigger_id", {
"script_path": "/scripts/new-handler",
"enabled": False
})
# Toggle trigger on/off
client.triggers.toggle("trigger_id")
# Delete trigger
client.triggers.delete("trigger_id")
| Event | Description |
|---|---|
insert | Fires on document creation |
update | Fires on document modification |
delete | Fires on document removal |
client.roles & client.users
Role-Based Access Control# === ROLES ===
# List all roles
roles = client.roles.list()
# Create a role with permissions
role = client.roles.create(
name="editor",
permissions=[
{"action": "read", "scope": "database", "database": "mydb"},
{"action": "write", "scope": "collection", "database": "mydb", "collection": "articles"},
{"action": "execute", "scope": "script", "database": "mydb"}
],
description="Content editor role"
)
# Get role details
role = client.roles.get("editor")
# Update role permissions
client.roles.update("editor", permissions=[
{"action": "read", "scope": "database", "database": "mydb"},
{"action": "write", "scope": "database", "database": "mydb"}
])
# Delete role
client.roles.delete("editor")
# === USERS ===
# List all users
users = client.users.list()
# Create a user
user = client.users.create(
username="john",
password="secure_password",
roles=["editor", "viewer"] # optional
)
# Get user's assigned roles
roles = client.users.get_roles("john")
# Assign a role to user
client.users.assign_role("john", "admin")
# Revoke a role from user
client.users.revoke_role("john", "admin")
# Get current authenticated user
me = client.users.me()
# Get current user's permissions
permissions = client.users.my_permissions()
# Delete user
client.users.delete("john")
| Action | Scopes | Description |
|---|---|---|
read | database, collection | Read documents and query |
write | database, collection | Create, update, delete documents |
admin | database, collection | Manage indexes, schema, etc. |
execute | script | Execute Lua scripts |
client.api_keys
API Key Management# Create API key
key = client.api_keys.create(
name="my-api-key",
databases=["mydb"],
expiration=None # optional: ISO8601 datetime
)
print(f"API Key: {key['key']}") # Save this! Only shown once
# List all API keys
keys = client.api_keys.list()
# Delete API key
client.api_keys.delete("key_id")
Advanced Features
client.vector
Vector Search & AIclient.use_database("mydb")
# Create a vector index
index = client.vector.create_index(
collection="products",
name="product_embeddings",
field="embedding",
dimensions=1536,
metric="cosine" # cosine, euclidean, dot_product
)
# Search by vector (semantic search)
embedding = get_embedding("wireless headphones") # Your embedding function
results = client.vector.search(
collection="products",
vector=embedding,
limit=10,
filter='doc.category == "electronics"' # optional SDBQL filter
)
for result in results:
print(f"{result['doc']['name']} - Score: {result['score']}")
# Search by existing document (find similar)
similar = client.vector.search_by_document(
collection="products",
doc_key="product-123",
field="embedding",
limit=5
)
# Quantize index (reduce memory usage)
client.vector.quantize("products", "product_embeddings", "binary")
# Dequantize (restore full precision)
client.vector.dequantize("products", "product_embeddings")
# Get index info
info = client.vector.get_index_info("products", "product_embeddings")
# List vector indexes
indexes = client.vector.list_indexes("products")
# Delete index
client.vector.delete_index("products", "product_embeddings")
client.geo
Geospatial Queriesclient.use_database("mydb")
# Create a geo index
client.geo.create_index(
collection="stores",
name="location_idx",
field="location",
geo_json=True # optional: true if using GeoJSON format
)
# Find nearby locations (radius search)
nearby = client.geo.near(
collection="stores",
latitude=48.8566,
longitude=2.3522,
radius=5000, # meters
limit=20
)
for result in nearby:
print(f"{result['doc']['name']} - {result['distance']}m away")
# Find within polygon
polygon = {
"type": "Polygon",
"coordinates": [[[2.3, 48.8], [2.4, 48.8], [2.4, 48.9], [2.3, 48.9], [2.3, 48.8]]]
}
within = client.geo.within("stores", geometry=polygon)
# List geo indexes
indexes = client.geo.list_indexes("stores")
# Delete index
client.geo.delete_index("stores", "location_idx")
client.ttl
Time-To-Live Indexesclient.use_database("mydb")
# Create TTL index (auto-expire documents)
client.ttl.create_index(
collection="sessions",
name="session_ttl",
field="created_at", # DateTime field to check
expire_after_seconds=3600 # Expire after 1 hour
)
# Update expiration time
client.ttl.update_expiration("sessions", "session_ttl", 7200) # 2 hours
# Get index info
info = client.ttl.get_index_info("sessions", "session_ttl")
print(f"Expires after: {info['expire_after_seconds']}s")
# Manually trigger cleanup (normally runs automatically)
result = client.ttl.run_cleanup("sessions")
print(f"Deleted {result['deleted']} expired documents")
# List TTL indexes
indexes = client.ttl.list_indexes("sessions")
# Delete TTL index
client.ttl.delete_index("sessions", "session_ttl")
client.columnar
Columnar/Analytics Storageclient.use_database("mydb")
# Create a columnar table (optimized for analytics)
table = client.columnar.create("metrics", [
{"name": "timestamp", "type": "datetime"},
{"name": "metric_name", "type": "string"},
{"name": "value", "type": "float"},
{"name": "tags", "type": "string"}
])
# Insert rows (batch insert is efficient)
client.columnar.insert("metrics", [
{"timestamp": "2024-01-15T10:00:00Z", "metric_name": "cpu_usage", "value": 45.2, "tags": "server1"},
{"timestamp": "2024-01-15T10:01:00Z", "metric_name": "cpu_usage", "value": 47.8, "tags": "server1"},
{"timestamp": "2024-01-15T10:00:00Z", "metric_name": "memory", "value": 72.1, "tags": "server1"}
])
# Query with SQL-like syntax
results = client.columnar.query("metrics",
"SELECT * FROM metrics WHERE value > @min ORDER BY timestamp DESC LIMIT 100",
params={"min": 40.0}
)
# Aggregation
agg = client.columnar.aggregate("metrics", {
"group_by": ["metric_name", "tags"],
"metrics": [
{"column": "value", "function": "avg"},
{"column": "value", "function": "max"},
{"column": "value", "function": "min"},
{"column": "value", "function": "count"}
],
"filters": {"metric_name": "cpu_usage"} # optional
})
# Get table statistics
stats = client.columnar.stats("metrics")
print(f"Row count: {stats['row_count']}, Size: {stats['size_bytes']}")
# Add a column
client.columnar.add_column("metrics", "host", "string", default_value="unknown")
# Drop a column
client.columnar.drop_column("metrics", "host")
# List all columnar tables
tables = client.columnar.list()
# Delete table
client.columnar.delete("metrics")
client.cluster
Cluster Management# Get cluster status
status = client.cluster.status()
print(f"Mode: {status['mode']}") # standalone, cluster
print(f"Nodes: {status['node_count']}")
# Get detailed cluster info
info = client.cluster.info()
# Remove a node from cluster
client.cluster.remove_node("node-id-to-remove")
# Trigger data rebalancing
client.cluster.rebalance()
# Cleanup orphaned data
client.cluster.cleanup()
# Reshard cluster
client.cluster.reshard(num_shards=16) # new number of shards
client.collections_mgmt
Advanced Collection Operationsclient.use_database("mydb")
# Truncate collection (delete all documents)
client.collections_mgmt.truncate("logs")
# Compact collection (reclaim disk space)
client.collections_mgmt.compact("users")
# Repair collection (fix inconsistencies)
client.collections_mgmt.repair("orders")
# Get collection statistics
stats = client.collections_mgmt.stats("users")
# Set JSON schema validation
client.collections_mgmt.set_schema("users", {
"type": "object",
"required": ["name", "email"],
"properties": {
"name": {"type": "string", "minLength": 1},
"email": {"type": "string", "format": "email"},
"age": {"type": "integer", "minimum": 0}
}
})
# Get current schema
schema = client.collections_mgmt.get_schema("users")
# Remove schema validation
client.collections_mgmt.delete_schema("users")
# Export collection
data = client.collections_mgmt.export("users", format="json")
# Import data
client.collections_mgmt.import_data("users_backup", data, format="json")
client.env
Environment Variablesclient.use_database("mydb")
# List environment variables (for Lua scripts)
vars = client.env.list()
# Set an environment variable
client.env.set("API_KEY", "sk-xxx-your-api-key")
client.env.set("WEBHOOK_URL", "https://example.com/webhook")
# Delete an environment variable
client.env.delete("OLD_VAR")
Error Handling
from solidb import Client
from solidb.exceptions import ConnectionError, ServerError, ProtocolError
try:
client = Client("127.0.0.1", 6745)
client.connect()
client.auth("mydb", "user", "password")
doc = client.get("mydb", "users", "nonexistent-key")
except ConnectionError as e:
# Network/connection issues
print(f"Connection failed: {e}")
except ServerError as e:
# Server-side errors (not found, validation, etc.)
print(f"Server error: {e}")
except ProtocolError as e:
# Protocol/serialization errors
print(f"Protocol error: {e}")
finally:
client.close()
ConnectionError
Network failures, connection refused, timeouts, disconnections
ServerError
Document not found, permission denied, validation errors
ProtocolError
Invalid response format, message too large, serialization issues