Python Client

Official SDK v0.1.0 Python 3.8+

Getting Started

Installation

Using pip:

pip install solidb

Using Poetry:

poetry add solidb

Requirements: Python 3.8 or higher. The client uses msgpack for binary serialization.

Quick Start

from solidb import Client

# Create client instance
client = Client("127.0.0.1", 6745)

# Establish connection
client.connect()

# Authenticate
client.auth("_system", "admin", "password")

# Set database context (required for sub-clients)
client.use_database("mydb")

# Basic CRUD operations
doc = client.insert("mydb", "users", {"name": "Alice", "age": 30})
print(f"Created: {doc['_key']}")

user = client.get("mydb", "users", doc["_key"])
print(f"Retrieved: {user['name']}")

client.update("mydb", "users", doc["_key"], {"age": 31})

# Query with SDBQL
results = client.query("mydb", """
    FOR u IN users
    FILTER u.age > @min
    RETURN u
""", {"min": 25})
print(f"Found {len(results)} users")

# Use management sub-clients
scripts = client.scripts.list()
triggers = client.triggers.list()

# Clean up
client.close()

Connection Management

# Initialize with host and port
client = Client("127.0.0.1", 6745)

# Connect (establishes TCP socket)
client.connect()

# Check connection health (returns latency in ms)
latency = client.ping()
print(f"Latency: {latency:.2f}ms")

# Close connection when done
client.close()

# Context manager support
with Client("127.0.0.1", 6745) as client:
    client.auth("_system", "admin", "password")
    # ... operations
# Auto-closes on exit
Method Returns Description
Client(host, port)ClientCreate client instance
connect()NoneEstablish TCP connection
ping()floatLatency in milliseconds
close()NoneClose connection
use_database(name)ClientSet database context for sub-clients

Authentication

# Authenticate with database, username, and password
client.auth("_system", "admin", "password")

# Authentication is required for most operations
# The session remains authenticated until disconnected

Core Operations

Database Operations

# List all databases
databases = client.list_databases()
# => ["_system", "mydb", "testdb"]

# Create a new database
client.create_database("analytics")

# Delete a database
client.delete_database("old_db")
Method Returns Description
list_databases()List[str]List all database names
create_database(name)NoneCreate new database
delete_database(name)NoneDelete database

Collection Operations

# List collections in a database
collections = client.list_collections("mydb")
# => [{"name": "users", "type": "document"}, ...]

# Create a document collection
client.create_collection("mydb", "products")

# Create an edge collection (for graphs)
client.create_collection("mydb", "relationships", col_type="edge")

# Delete a collection
client.delete_collection("mydb", "old_collection")
Method Returns Description
list_collections(db)List[Dict]List collections in database
create_collection(db, name, col_type=None)NoneCreate collection (type: None/edge)
delete_collection(db, name)NoneDelete collection

Document Operations (CRUD)

# INSERT - Create a new document
doc = client.insert("mydb", "users", {
    "name": "Alice",
    "email": "[email protected]",
    "age": 30
})
print(doc["_key"])  # Auto-generated key

# INSERT with custom key
doc = client.insert("mydb", "users", {"name": "Bob"}, key="custom-key-123")

# GET - Retrieve a document by key
user = client.get("mydb", "users", "custom-key-123")
# => {"_key": "custom-key-123", "name": "Bob", ...}

# UPDATE - Modify a document (merge by default)
client.update("mydb", "users", "custom-key-123", {"age": 25})

# UPDATE - Replace entire document (merge=False)
client.update("mydb", "users", "custom-key-123", {"name": "Robert"}, merge=False)

# DELETE - Remove a document
client.delete("mydb", "users", "custom-key-123")

# LIST - Paginated document listing
docs = client.list("mydb", "users", limit=50, offset=0)
Method Returns Description
insert(db, col, doc, key=None)DictInsert document, returns doc with _key
get(db, col, key)DictGet document by key
update(db, col, key, doc, merge=True)NoneUpdate document (merge or replace)
delete(db, col, key)NoneDelete document
list(db, col, limit=50, offset=0)List[Dict]List documents with pagination

SDBQL Queries

# Simple query
users = client.query("mydb", "FOR u IN users RETURN u")

# Query with bind variables (recommended for security)
results = client.query("mydb", """
    FOR u IN users
    FILTER u.age >= @min_age AND u.status == @status
    SORT u.created_at DESC
    LIMIT @limit
    RETURN { name: u.name, email: u.email }
""", {
    "min_age": 18,
    "status": "active",
    "limit": 100
})

# Aggregation query
stats = client.query("mydb", """
    FOR u IN users
    COLLECT status = u.status WITH COUNT INTO count
    RETURN { status, count }
""")

# Join query
orders = client.query("mydb", """
    FOR o IN orders
    FOR u IN users FILTER u._key == o.user_id
    RETURN { order: o, user: u.name }
""")

# Explain query plan (for optimization)
plan = client.explain("mydb", "FOR u IN users FILTER u.age > 25 RETURN u")

ACID Transactions

# Begin a transaction
tx_id = client.begin_transaction("mydb", isolation="read_committed")
# Isolation levels: read_uncommitted, read_committed, repeatable_read, serializable

try:
    # Perform operations within transaction
    client.insert("mydb", "accounts", {"id": 1, "balance": 1000})
    client.insert("mydb", "accounts", {"id": 2, "balance": 500})

    # Commit if all operations succeed
    client.commit_transaction(tx_id)
    print("Transaction committed")
except Exception as e:
    # Rollback on any error
    client.rollback_transaction(tx_id)
    print(f"Transaction rolled back: {e}")
Method Returns Description
begin_transaction(db, isolation=None)strStart transaction, returns tx_id
commit_transaction(tx_id)NoneCommit transaction
rollback_transaction(tx_id)NoneRollback transaction

Index Management

# Create an index
client.create_index("mydb", "users", "idx_email", ["email"], unique=True, sparse=False)

# List indexes on a collection
indexes = client.list_indexes("mydb", "users")

# Delete an index
client.delete_index("mydb", "users", "idx_email")

Management Sub-Clients

Sub-clients provide namespaced access to management APIs. Important: Call use_database(name) first to set the database context.

client.scripts

Lua Script Endpoints
client.use_database("mydb")

# Create a Lua script endpoint
script = client.scripts.create(
    name="hello",
    path="/api/hello",
    methods=["GET", "POST"],
    code='return { message = "Hello, " .. (req.params.name or "World") }',
    description="Greeting endpoint",  # optional
    collection="users"                 # optional: restrict to collection
)
print(f"Created script: {script['_key']}")

# List all scripts
scripts = client.scripts.list()
for s in scripts:
    print(f"{s['name']} -> {s['path']}")

# Get a specific script
script = client.scripts.get("script_key")

# Update script code
client.scripts.update("script_key", {
    "code": 'return { message = "Updated!" }',
    "methods": ["GET"]
})

# Delete a script
client.scripts.delete("script_key")

# Get execution statistics
stats = client.scripts.get_stats()
print(f"Total calls: {stats['total_calls']}")
Method Parameters Description
createname, path, methods, code, description=, collection=Create Lua endpoint
list()-List all scripts
get(script_id)script_idGet script details
update(script_id, updates)script_id, dictUpdate script properties
delete(script_id)script_idDelete script
get_stats()-Execution statistics

client.jobs & client.cron

Background Processing
client.use_database("mydb")

# === JOBS ===

# List all queues
queues = client.jobs.list_queues()
# => [{"name": "default", "pending": 5, "running": 2}, ...]

# List jobs in a queue with filters
jobs = client.jobs.list_jobs("default",
    status="pending",  # pending, running, completed, failed
    limit=50,
    offset=0
)

# Enqueue a new job
job = client.jobs.enqueue("default",
    script_path="/scripts/process-order",
    params={"order_id": 12345},
    priority=10,        # optional: higher = more urgent
    run_at=None         # optional: ISO8601 datetime for delayed execution
)
print(f"Job ID: {job['_key']}")

# Get job details
job = client.jobs.get_job("job_id")
print(f"Status: {job['status']}")

# Cancel a pending job
client.jobs.cancel("job_id")

# === CRON ===

# List scheduled jobs
crons = client.cron.list()

# Create a cron job
cron = client.cron.create(
    name="daily-cleanup",
    schedule="0 2 * * *",              # Every day at 2 AM
    script_path="/scripts/cleanup",
    params={"days_old": 30},           # optional
    description="Remove old records"   # optional
)

# Get cron job details
cron = client.cron.get("cron_id")

# Update cron schedule
client.cron.update("cron_id", {"schedule": "0 3 * * *"})

# Delete cron job
client.cron.delete("cron_id")

client.triggers

Database Triggers
client.use_database("mydb")

# List all triggers
triggers = client.triggers.list()

# List triggers for a specific collection
triggers = client.triggers.list_by_collection("users")

# Create a trigger
trigger = client.triggers.create(
    name="on_user_created",
    collection="users",
    operation="insert",                  # insert, update, delete
    script_path="/scripts/on-user-create",
    description="Handle new user"        # optional
)

# Get trigger details
trigger = client.triggers.get("trigger_id")

# Update trigger
client.triggers.update("trigger_id", {
    "script_path": "/scripts/new-handler",
    "enabled": False
})

# Toggle trigger on/off
client.triggers.toggle("trigger_id")

# Delete trigger
client.triggers.delete("trigger_id")
Event Description
insertFires on document creation
updateFires on document modification
deleteFires on document removal

client.roles & client.users

Role-Based Access Control
# === ROLES ===

# List all roles
roles = client.roles.list()

# Create a role with permissions
role = client.roles.create(
    name="editor",
    permissions=[
        {"action": "read", "scope": "database", "database": "mydb"},
        {"action": "write", "scope": "collection", "database": "mydb", "collection": "articles"},
        {"action": "execute", "scope": "script", "database": "mydb"}
    ],
    description="Content editor role"
)

# Get role details
role = client.roles.get("editor")

# Update role permissions
client.roles.update("editor", permissions=[
    {"action": "read", "scope": "database", "database": "mydb"},
    {"action": "write", "scope": "database", "database": "mydb"}
])

# Delete role
client.roles.delete("editor")

# === USERS ===

# List all users
users = client.users.list()

# Create a user
user = client.users.create(
    username="john",
    password="secure_password",
    roles=["editor", "viewer"]  # optional
)

# Get user's assigned roles
roles = client.users.get_roles("john")

# Assign a role to user
client.users.assign_role("john", "admin")

# Revoke a role from user
client.users.revoke_role("john", "admin")

# Get current authenticated user
me = client.users.me()

# Get current user's permissions
permissions = client.users.my_permissions()

# Delete user
client.users.delete("john")
Action Scopes Description
readdatabase, collectionRead documents and query
writedatabase, collectionCreate, update, delete documents
admindatabase, collectionManage indexes, schema, etc.
executescriptExecute Lua scripts

client.api_keys

API Key Management
# Create API key
key = client.api_keys.create(
    name="my-api-key",
    databases=["mydb"],
    expiration=None  # optional: ISO8601 datetime
)
print(f"API Key: {key['key']}")  # Save this! Only shown once

# List all API keys
keys = client.api_keys.list()

# Delete API key
client.api_keys.delete("key_id")

Advanced Features

client.vector

Vector Search & AI
client.use_database("mydb")

# Create a vector index
index = client.vector.create_index(
    collection="products",
    name="product_embeddings",
    field="embedding",
    dimensions=1536,
    metric="cosine"  # cosine, euclidean, dot_product
)

# Search by vector (semantic search)
embedding = get_embedding("wireless headphones")  # Your embedding function
results = client.vector.search(
    collection="products",
    vector=embedding,
    limit=10,
    filter='doc.category == "electronics"'  # optional SDBQL filter
)

for result in results:
    print(f"{result['doc']['name']} - Score: {result['score']}")

# Search by existing document (find similar)
similar = client.vector.search_by_document(
    collection="products",
    doc_key="product-123",
    field="embedding",
    limit=5
)

# Quantize index (reduce memory usage)
client.vector.quantize("products", "product_embeddings", "binary")

# Dequantize (restore full precision)
client.vector.dequantize("products", "product_embeddings")

# Get index info
info = client.vector.get_index_info("products", "product_embeddings")

# List vector indexes
indexes = client.vector.list_indexes("products")

# Delete index
client.vector.delete_index("products", "product_embeddings")

client.geo

Geospatial Queries
client.use_database("mydb")

# Create a geo index
client.geo.create_index(
    collection="stores",
    name="location_idx",
    field="location",
    geo_json=True  # optional: true if using GeoJSON format
)

# Find nearby locations (radius search)
nearby = client.geo.near(
    collection="stores",
    latitude=48.8566,
    longitude=2.3522,
    radius=5000,  # meters
    limit=20
)

for result in nearby:
    print(f"{result['doc']['name']} - {result['distance']}m away")

# Find within polygon
polygon = {
    "type": "Polygon",
    "coordinates": [[[2.3, 48.8], [2.4, 48.8], [2.4, 48.9], [2.3, 48.9], [2.3, 48.8]]]
}
within = client.geo.within("stores", geometry=polygon)

# List geo indexes
indexes = client.geo.list_indexes("stores")

# Delete index
client.geo.delete_index("stores", "location_idx")

client.ttl

Time-To-Live Indexes
client.use_database("mydb")

# Create TTL index (auto-expire documents)
client.ttl.create_index(
    collection="sessions",
    name="session_ttl",
    field="created_at",           # DateTime field to check
    expire_after_seconds=3600     # Expire after 1 hour
)

# Update expiration time
client.ttl.update_expiration("sessions", "session_ttl", 7200)  # 2 hours

# Get index info
info = client.ttl.get_index_info("sessions", "session_ttl")
print(f"Expires after: {info['expire_after_seconds']}s")

# Manually trigger cleanup (normally runs automatically)
result = client.ttl.run_cleanup("sessions")
print(f"Deleted {result['deleted']} expired documents")

# List TTL indexes
indexes = client.ttl.list_indexes("sessions")

# Delete TTL index
client.ttl.delete_index("sessions", "session_ttl")

client.columnar

Columnar/Analytics Storage
client.use_database("mydb")

# Create a columnar table (optimized for analytics)
table = client.columnar.create("metrics", [
    {"name": "timestamp", "type": "datetime"},
    {"name": "metric_name", "type": "string"},
    {"name": "value", "type": "float"},
    {"name": "tags", "type": "string"}
])

# Insert rows (batch insert is efficient)
client.columnar.insert("metrics", [
    {"timestamp": "2024-01-15T10:00:00Z", "metric_name": "cpu_usage", "value": 45.2, "tags": "server1"},
    {"timestamp": "2024-01-15T10:01:00Z", "metric_name": "cpu_usage", "value": 47.8, "tags": "server1"},
    {"timestamp": "2024-01-15T10:00:00Z", "metric_name": "memory", "value": 72.1, "tags": "server1"}
])

# Query with SQL-like syntax
results = client.columnar.query("metrics",
    "SELECT * FROM metrics WHERE value > @min ORDER BY timestamp DESC LIMIT 100",
    params={"min": 40.0}
)

# Aggregation
agg = client.columnar.aggregate("metrics", {
    "group_by": ["metric_name", "tags"],
    "metrics": [
        {"column": "value", "function": "avg"},
        {"column": "value", "function": "max"},
        {"column": "value", "function": "min"},
        {"column": "value", "function": "count"}
    ],
    "filters": {"metric_name": "cpu_usage"}  # optional
})

# Get table statistics
stats = client.columnar.stats("metrics")
print(f"Row count: {stats['row_count']}, Size: {stats['size_bytes']}")

# Add a column
client.columnar.add_column("metrics", "host", "string", default_value="unknown")

# Drop a column
client.columnar.drop_column("metrics", "host")

# List all columnar tables
tables = client.columnar.list()

# Delete table
client.columnar.delete("metrics")

client.cluster

Cluster Management
# Get cluster status
status = client.cluster.status()
print(f"Mode: {status['mode']}")  # standalone, cluster
print(f"Nodes: {status['node_count']}")

# Get detailed cluster info
info = client.cluster.info()

# Remove a node from cluster
client.cluster.remove_node("node-id-to-remove")

# Trigger data rebalancing
client.cluster.rebalance()

# Cleanup orphaned data
client.cluster.cleanup()

# Reshard cluster
client.cluster.reshard(num_shards=16)  # new number of shards

client.collections_mgmt

Advanced Collection Operations
client.use_database("mydb")

# Truncate collection (delete all documents)
client.collections_mgmt.truncate("logs")

# Compact collection (reclaim disk space)
client.collections_mgmt.compact("users")

# Repair collection (fix inconsistencies)
client.collections_mgmt.repair("orders")

# Get collection statistics
stats = client.collections_mgmt.stats("users")

# Set JSON schema validation
client.collections_mgmt.set_schema("users", {
    "type": "object",
    "required": ["name", "email"],
    "properties": {
        "name": {"type": "string", "minLength": 1},
        "email": {"type": "string", "format": "email"},
        "age": {"type": "integer", "minimum": 0}
    }
})

# Get current schema
schema = client.collections_mgmt.get_schema("users")

# Remove schema validation
client.collections_mgmt.delete_schema("users")

# Export collection
data = client.collections_mgmt.export("users", format="json")

# Import data
client.collections_mgmt.import_data("users_backup", data, format="json")

client.env

Environment Variables
client.use_database("mydb")

# List environment variables (for Lua scripts)
vars = client.env.list()

# Set an environment variable
client.env.set("API_KEY", "sk-xxx-your-api-key")
client.env.set("WEBHOOK_URL", "https://example.com/webhook")

# Delete an environment variable
client.env.delete("OLD_VAR")

Error Handling

from solidb import Client
from solidb.exceptions import ConnectionError, ServerError, ProtocolError

try:
    client = Client("127.0.0.1", 6745)
    client.connect()
    client.auth("mydb", "user", "password")

    doc = client.get("mydb", "users", "nonexistent-key")

except ConnectionError as e:
    # Network/connection issues
    print(f"Connection failed: {e}")

except ServerError as e:
    # Server-side errors (not found, validation, etc.)
    print(f"Server error: {e}")

except ProtocolError as e:
    # Protocol/serialization errors
    print(f"Protocol error: {e}")

finally:
    client.close()

ConnectionError

Network failures, connection refused, timeouts, disconnections

ServerError

Document not found, permission denied, validation errors

ProtocolError

Invalid response format, message too large, serialization issues