chore: Regenerate all playbooks

This commit is contained in:
GitLab CI 2025-11-19 16:21:52 +00:00
parent ab28aa03a0
commit 0e635bbca9
7 changed files with 2169 additions and 906 deletions

1993
LICENSE-3rd-party Normal file

File diff suppressed because it is too large Load Diff

View File

@ -55,6 +55,8 @@ The setup includes:
- **Rollback**: Stop and remove Docker containers, delete downloaded models if needed
Last Updated: 11/19/2025
## Instructions
## Step 1. Clone the repository

View File

@ -23,7 +23,6 @@ RUN pip install --no-cache-dir -r requirements.txt
# Copy the service code
COPY unified_gpu_service.py .
COPY pygraphistry_service.py .
# Create a non-root user for security (using a different UID to avoid conflicts)
RUN useradd -m -u 1001 appuser && chown -R appuser:appuser /app

View File

@ -9,12 +9,11 @@ This directory contains optional GPU-accelerated graph visualization services th
## 📦 Available Services
### 1. Unified GPU Service (`unified_gpu_service.py`)
Combines **PyGraphistry Cloud** and **Local GPU (cuGraph)** processing into a single FastAPI service.
Provides **Local GPU (cuGraph)** and **CPU (NetworkX)** processing in a single FastAPI service.
**Processing Modes:**
| Mode | Description | Requirements |
|------|-------------|--------------|
| **PyGraphistry Cloud** | Interactive GPU embeds in browser | API credentials |
| **Local GPU (cuGraph)** | Full GPU processing on your hardware | NVIDIA GPU + cuGraph |
| **Local CPU** | NetworkX fallback processing | None |
@ -27,9 +26,8 @@ Local GPU processing service with WebSocket support for real-time updates.
## 🛠️ Setup
### Prerequisites
- NVIDIA GPU with CUDA support (for GPU modes)
- NVIDIA GPU with CUDA support (for GPU mode)
- RAPIDS cuGraph (for local GPU processing)
- PyGraphistry account (for cloud mode)
### Installation
@ -94,9 +92,8 @@ Response:
```json
{
"processing_modes": {
"pygraphistry_cloud": {"available": true, "description": "..."},
"local_gpu": {"available": true, "description": "..."},
"local_cpu": {"available": true, "description": "..."}
"local_gpu": {"available": true, "description": "Local GPU processing with cuGraph/RAPIDS"},
"local_cpu": {"available": true, "description": "Local CPU fallback processing with NetworkX"}
},
"has_rapids": true,
"gpu_available": true
@ -108,33 +105,18 @@ Response:
The txt2kg frontend includes built-in components for GPU visualization:
- `UnifiedGPUViewer`: Connects to unified GPU service
- `PyGraphistryViewer`: Direct PyGraphistry cloud integration
- `ForceGraphWrapper`: Three.js WebGPU visualization (default)
### Using GPU Services in Frontend
The frontend has API routes that can connect to these services:
- `/api/pygraphistry/*`: PyGraphistry integration
- `/api/unified-gpu/*`: Unified GPU service integration
To use these services, ensure they are running separately and configure the frontend environment variables accordingly.
### Mode-Specific Processing
### Processing Graph Data
```javascript
// PyGraphistry Cloud mode
const response = await fetch('/api/unified-gpu/visualize', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
graph_data: { nodes, links },
processing_mode: 'pygraphistry_cloud',
layout_type: 'force',
clustering: true,
gpu_acceleration: true
})
})
// Local GPU mode
const response = await fetch('/api/unified-gpu/visualize', {
method: 'POST',
@ -147,15 +129,20 @@ const response = await fetch('/api/unified-gpu/visualize', {
compute_centrality: true
})
})
// Local CPU mode
const response = await fetch('/api/unified-gpu/visualize', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
graph_data: { nodes, links },
processing_mode: 'local_cpu'
})
})
```
## 🔧 Configuration Options
### PyGraphistry Cloud Mode
- `layout_type`: "force", "circular", "hierarchical"
- `gpu_acceleration`: true/false
- `clustering`: true/false
### Local GPU Mode
- `layout_algorithm`: "force_atlas2", "spectral", "fruchterman_reingold"
- `clustering_algorithm`: "leiden", "louvain", "spectral"
@ -172,8 +159,7 @@ const response = await fetch('/api/unified-gpu/visualize', {
"processed_nodes": [...],
"processed_edges": [...],
"processing_mode": "local_gpu",
"embed_url": "https://hub.graphistry.com/...", // Only for cloud mode
"layout_positions": {...}, // Only for local GPU mode
"layout_positions": {...},
"clusters": {...},
"centrality": {...},
"stats": {
@ -187,18 +173,17 @@ const response = await fetch('/api/unified-gpu/visualize', {
}
```
## 🚀 Benefits of Unified Approach
## 🚀 Benefits
### ✅ Advantages
- **Single service** - One port, one deployment
- **Mode switching** - Choose best processing per graph
- **Fallback handling** - Graceful degradation if GPU unavailable
- **Consistent API** - Same interface for all modes
- **Better testing** - Easy comparison between modes
- **No GPL dependencies** - All dependencies are permissively licensed
### 🎯 Use Cases
- **PyGraphistry Cloud**: Sharing visualizations, demos, production embeds
- **Local GPU**: Private data, large-scale processing, custom algorithms
- **Local GPU**: Private data, large-scale processing, GPU-accelerated algorithms
- **Local CPU**: Development, testing, small graphs
## 🐛 Troubleshooting
@ -212,16 +197,6 @@ nvidia-smi
python -c "import cudf, cugraph; print('RAPIDS OK')"
```
### PyGraphistry Credentials
```bash
# Verify credentials are set
echo $GRAPHISTRY_PERSONAL_KEY
echo $GRAPHISTRY_SECRET_KEY
# Test connection
python -c "import graphistry; graphistry.register(personal_key_id='$GRAPHISTRY_PERSONAL_KEY', personal_key_secret='$GRAPHISTRY_SECRET_KEY'); print('PyGraphistry OK')"
```
### Service Health
```bash
curl http://localhost:8080/api/health
@ -229,7 +204,13 @@ curl http://localhost:8080/api/health
## 📈 Performance Tips
1. **Large graphs (>100k nodes)**: Use `local_gpu` mode
2. **Sharing/demos**: Use `pygraphistry_cloud` mode
3. **Development**: Use `local_cpu` mode for speed
4. **Mixed workloads**: Switch modes dynamically based on graph size
1. **Large graphs (>100k nodes)**: Use `local_gpu` mode with RAPIDS cuGraph
2. **Development**: Use `local_cpu` mode for speed and simplicity
3. **Mixed workloads**: Switch modes dynamically based on graph size and GPU availability
## 📝 License Compliance
This service has been updated to remove all GPL-licensed dependencies:
- ❌ Removed: `igraph` (GPL v2+)
- ❌ Removed: `graphistry` with `compute_igraph` (uses igraph internally)
- ✅ Uses only: NetworkX (BSD), cuGraph (Apache 2.0), NumPy (BSD), pandas (BSD)

View File

@ -1,712 +0,0 @@
import graphistry
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional
import asyncio
import json
from datetime import datetime
import logging
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import uvicorn
import os
import time
from concurrent.futures import ThreadPoolExecutor
import networkx as nx
from enum import Enum
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize PyGraphistry
def init_graphistry():
"""Initialize PyGraphistry with GPU acceleration"""
try:
# Set up authentication - check for different credential types
api_key = os.getenv('GRAPHISTRY_API_KEY')
personal_key = os.getenv('GRAPHISTRY_PERSONAL_KEY')
secret_key = os.getenv('GRAPHISTRY_SECRET_KEY')
username = os.getenv('GRAPHISTRY_USERNAME')
password = os.getenv('GRAPHISTRY_PASSWORD')
if personal_key and secret_key:
# Configure for cloud API with personal key and secret
graphistry.register(
api=3,
protocol="https",
server="hub.graphistry.com",
personal_key_id=personal_key,
personal_key_secret=secret_key
)
logger.info("PyGraphistry initialized with personal key/secret for cloud GPU acceleration")
return True
elif api_key:
# Configure for cloud API with API key
graphistry.register(api=3, protocol="https", server="hub.graphistry.com", api_key=api_key)
logger.info("PyGraphistry initialized with API key for cloud GPU acceleration")
return True
elif username and password:
# Configure for cloud API with username/password
graphistry.register(api=3, protocol="https", server="hub.graphistry.com",
username=username, password=password)
logger.info("PyGraphistry initialized with username/password for cloud GPU acceleration")
return True
else:
# Configure for local mode
graphistry.register(api=3)
logger.info("PyGraphistry initialized in local CPU mode")
return True
except Exception as e:
logger.error(f"Failed to initialize PyGraphistry: {e}")
return False
class GraphPattern(str, Enum):
RANDOM = "random"
SCALE_FREE = "scale-free"
SMALL_WORLD = "small-world"
CLUSTERED = "clustered"
HIERARCHICAL = "hierarchical"
GRID = "grid"
class GraphData(BaseModel):
nodes: List[Dict[str, Any]]
links: List[Dict[str, Any]]
class GraphGenerationRequest(BaseModel):
num_nodes: int
pattern: GraphPattern = GraphPattern.SCALE_FREE
avg_degree: Optional[int] = 5
num_clusters: Optional[int] = 100
small_world_k: Optional[int] = 6
small_world_p: Optional[float] = 0.1
grid_dimensions: Optional[List[int]] = [100, 100]
seed: Optional[int] = None
class VisualizationRequest(BaseModel):
graph_data: GraphData
layout_type: Optional[str] = "force"
gpu_acceleration: Optional[bool] = True
clustering: Optional[bool] = False
node_size_attribute: Optional[str] = None
node_color_attribute: Optional[str] = None
edge_weight_attribute: Optional[str] = None
class GraphGenerationStatus(BaseModel):
task_id: str
status: str # "running", "completed", "failed"
progress: float
message: str
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class LargeGraphGenerator:
"""Optimized graph generation using NetworkX and NumPy for performance"""
@staticmethod
def generate_random_graph(num_nodes: int, avg_degree: int = 5, seed: Optional[int] = None) -> GraphData:
"""Generate random graph using ErdősRényi model"""
if seed:
np.random.seed(seed)
# Calculate edge probability for desired average degree
p = avg_degree / (num_nodes - 1)
# Use NetworkX for efficient generation
G = nx.erdos_renyi_graph(num_nodes, p, seed=seed)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_scale_free_graph(num_nodes: int, m: int = 3, seed: Optional[int] = None) -> GraphData:
"""Generate scale-free graph using BarabásiAlbert model"""
G = nx.barabasi_albert_graph(num_nodes, m, seed=seed)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_small_world_graph(num_nodes: int, k: int = 6, p: float = 0.1, seed: Optional[int] = None) -> GraphData:
"""Generate small-world graph using Watts-Strogatz model"""
G = nx.watts_strogatz_graph(num_nodes, k, p, seed=seed)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_clustered_graph(num_nodes: int, num_clusters: int = 100, seed: Optional[int] = None) -> GraphData:
"""Generate clustered graph with intra and inter-cluster connections"""
if seed:
np.random.seed(seed)
cluster_size = num_nodes // num_clusters
G = nx.Graph()
# Add nodes with cluster information
for i in range(num_nodes):
cluster_id = i // cluster_size
G.add_node(i, cluster=cluster_id)
# Generate intra-cluster edges
intra_prob = 0.1
for cluster in range(num_clusters):
cluster_start = cluster * cluster_size
cluster_end = min(cluster_start + cluster_size, num_nodes)
cluster_nodes = list(range(cluster_start, cluster_end))
# Create subgraph for cluster
cluster_subgraph = nx.erdos_renyi_graph(len(cluster_nodes), intra_prob)
# Add edges to main graph with proper node mapping
for edge in cluster_subgraph.edges():
G.add_edge(cluster_nodes[edge[0]], cluster_nodes[edge[1]])
# Generate inter-cluster edges
inter_prob = 0.001
for i in range(num_nodes):
for j in range(i + 1, num_nodes):
if G.nodes[i].get('cluster') != G.nodes[j].get('cluster'):
if np.random.random() < inter_prob:
G.add_edge(i, j)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_hierarchical_graph(num_nodes: int, branching_factor: int = 3, seed: Optional[int] = None) -> GraphData:
"""Generate hierarchical (tree-like) graph"""
G = nx.random_tree(num_nodes, seed=seed)
# Add some cross-links to make it more interesting
if seed:
np.random.seed(seed)
# Add 10% additional edges for cross-connections
num_additional_edges = max(1, num_nodes // 10)
nodes = list(G.nodes())
for _ in range(num_additional_edges):
u, v = np.random.choice(nodes, 2, replace=False)
if not G.has_edge(u, v):
G.add_edge(u, v)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_grid_graph(dimensions: List[int], seed: Optional[int] = None) -> GraphData:
"""Generate 2D or 3D grid graph"""
if len(dimensions) == 2:
G = nx.grid_2d_graph(dimensions[0], dimensions[1])
elif len(dimensions) == 3:
G = nx.grid_graph(dimensions)
else:
raise ValueError("Grid dimensions must be 2D or 3D")
# Convert coordinate tuples to integer node IDs
mapping = {node: i for i, node in enumerate(G.nodes())}
G = nx.relabel_nodes(G, mapping)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def _networkx_to_graphdata(G: nx.Graph) -> GraphData:
"""Convert NetworkX graph to GraphData format"""
nodes = []
links = []
# Convert nodes
for node_id in G.nodes():
node_data = G.nodes[node_id]
node = {
"id": f"n{node_id}",
"name": f"Node {node_id}",
"val": np.random.randint(1, 11),
"degree": G.degree(node_id)
}
# Add cluster information if available
if 'cluster' in node_data:
node['group'] = f"cluster_{node_data['cluster']}"
else:
node['group'] = f"group_{node_id % 10}"
nodes.append(node)
# Convert edges
for edge in G.edges():
link = {
"source": f"n{edge[0]}",
"target": f"n{edge[1]}",
"name": f"link_{edge[0]}_{edge[1]}",
"weight": np.random.uniform(0.1, 5.0)
}
links.append(link)
return GraphData(nodes=nodes, links=links)
class PyGraphistryService:
def __init__(self):
self.initialized = init_graphistry()
self.generation_tasks = {} # Store background tasks
self.executor = ThreadPoolExecutor(max_workers=4)
async def generate_graph_async(self, request: GraphGenerationRequest, task_id: str):
"""Generate graph asynchronously"""
try:
self.generation_tasks[task_id] = GraphGenerationStatus(
task_id=task_id,
status="running",
progress=0.0,
message="Starting graph generation..."
)
start_time = time.time()
# Update progress
self.generation_tasks[task_id].progress = 10.0
self.generation_tasks[task_id].message = f"Generating {request.pattern.value} graph with {request.num_nodes} nodes..."
# Generate graph based on pattern
if request.pattern == GraphPattern.RANDOM:
graph_data = LargeGraphGenerator.generate_random_graph(
request.num_nodes, request.avg_degree, request.seed
)
elif request.pattern == GraphPattern.SCALE_FREE:
m = min(request.avg_degree, request.num_nodes - 1) if request.avg_degree else 3
graph_data = LargeGraphGenerator.generate_scale_free_graph(
request.num_nodes, m, request.seed
)
elif request.pattern == GraphPattern.SMALL_WORLD:
graph_data = LargeGraphGenerator.generate_small_world_graph(
request.num_nodes,
request.small_world_k or 6,
request.small_world_p or 0.1,
request.seed
)
elif request.pattern == GraphPattern.CLUSTERED:
graph_data = LargeGraphGenerator.generate_clustered_graph(
request.num_nodes, request.num_clusters or 100, request.seed
)
elif request.pattern == GraphPattern.HIERARCHICAL:
graph_data = LargeGraphGenerator.generate_hierarchical_graph(
request.num_nodes, seed=request.seed
)
elif request.pattern == GraphPattern.GRID:
# Calculate grid dimensions for given number of nodes
if request.grid_dimensions:
dimensions = request.grid_dimensions
else:
side_length = int(np.sqrt(request.num_nodes))
dimensions = [side_length, side_length]
graph_data = LargeGraphGenerator.generate_grid_graph(dimensions, request.seed)
else:
raise ValueError(f"Unknown graph pattern: {request.pattern}")
# Update progress
self.generation_tasks[task_id].progress = 80.0
self.generation_tasks[task_id].message = "Computing graph statistics..."
# Calculate statistics
generation_time = time.time() - start_time
stats = {
"node_count": len(graph_data.nodes),
"edge_count": len(graph_data.links),
"generation_time": generation_time,
"density": len(graph_data.links) / (len(graph_data.nodes) * (len(graph_data.nodes) - 1) / 2) if len(graph_data.nodes) > 1 else 0,
"avg_degree": 2 * len(graph_data.links) / len(graph_data.nodes) if len(graph_data.nodes) > 0 else 0,
"pattern": request.pattern.value,
"parameters": request.model_dump()
}
# Complete task
self.generation_tasks[task_id].status = "completed"
self.generation_tasks[task_id].progress = 100.0
self.generation_tasks[task_id].message = f"Generated {stats['node_count']} nodes and {stats['edge_count']} edges in {generation_time:.2f}s"
self.generation_tasks[task_id].result = {
"graph_data": graph_data.model_dump(),
"stats": stats
}
logger.info(f"Graph generation completed for task {task_id}: {stats}")
except Exception as e:
logger.error(f"Graph generation failed for task {task_id}: {e}")
self.generation_tasks[task_id].status = "failed"
self.generation_tasks[task_id].error = str(e)
self.generation_tasks[task_id].message = f"Generation failed: {e}"
async def start_graph_generation(self, request: GraphGenerationRequest) -> str:
"""Start graph generation as background task"""
task_id = f"gen_{int(time.time() * 1000)}"
# Run generation in thread pool to avoid blocking
loop = asyncio.get_event_loop()
loop.run_in_executor(
self.executor,
lambda: asyncio.run(self.generate_graph_async(request, task_id))
)
return task_id
def get_generation_status(self, task_id: str) -> Optional[GraphGenerationStatus]:
"""Get status of graph generation task"""
return self.generation_tasks.get(task_id)
async def process_graph_data(self, request: VisualizationRequest) -> Dict[str, Any]:
"""Process graph data with PyGraphistry GPU acceleration"""
try:
if not self.initialized:
raise HTTPException(status_code=500, detail="PyGraphistry not initialized")
# Convert to pandas DataFrames for PyGraphistry
nodes_df = pd.DataFrame(request.graph_data.nodes)
edges_df = pd.DataFrame(request.graph_data.links)
# Ensure required columns exist
if 'id' not in nodes_df.columns:
nodes_df['id'] = nodes_df.index
if 'source' not in edges_df.columns or 'target' not in edges_df.columns:
raise HTTPException(status_code=400, detail="Links must have source and target columns")
logger.info(f"Processing graph with {len(nodes_df)} nodes and {len(edges_df)} edges")
# Create PyGraphistry graph object
try:
g = graphistry.edges(edges_df, 'source', 'target').nodes(nodes_df, 'id')
logger.info(f"Created PyGraphistry graph object")
except Exception as e:
logger.error(f"Failed to create PyGraphistry graph: {e}")
raise HTTPException(status_code=500, detail=f"Graph creation failed: {e}")
# Apply GPU-accelerated processing
if request.gpu_acceleration:
g = await self._apply_gpu_acceleration(g, request)
# Apply clustering if requested
if request.clustering:
g = await self._apply_clustering(g)
# Generate layout
g = await self._generate_layout(g, request.layout_type)
# Extract processed data
try:
processed_nodes = g._nodes.to_dict('records') if g._nodes is not None else nodes_df.to_dict('records')
processed_edges = g._edges.to_dict('records') if g._edges is not None else edges_df.to_dict('records')
logger.info(f"Extracted {len(processed_nodes)} nodes and {len(processed_edges)} edges")
except Exception as e:
logger.warning(f"Data extraction failed, using original data: {e}")
processed_nodes = nodes_df.to_dict('records')
processed_edges = edges_df.to_dict('records')
# Generate embedding URL for interactive visualization
embed_url = None
local_viz_data = None
try:
embed_url = g.plot(render=False)
logger.info(f"Generated PyGraphistry embed URL: {embed_url}")
except Exception as e:
logger.warning(f"Could not generate embed URL (likely running in local mode): {e}")
# Create local visualization data as fallback
try:
local_viz_data = self._create_local_viz_data(g, processed_nodes, processed_edges)
logger.info("Generated local visualization data as fallback")
except Exception as viz_e:
logger.warning(f"Could not generate local visualization data: {viz_e}")
return {
"processed_nodes": processed_nodes,
"processed_edges": processed_edges,
"embed_url": embed_url,
"local_viz_data": local_viz_data,
"stats": {
"node_count": len(processed_nodes),
"edge_count": len(processed_edges),
"gpu_accelerated": request.gpu_acceleration,
"clustered": request.clustering,
"layout_type": request.layout_type,
"has_embed_url": embed_url is not None,
"has_local_viz": local_viz_data is not None
},
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error processing graph data: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def _apply_gpu_acceleration(self, g, request: VisualizationRequest):
"""Apply GPU acceleration using PyGraphistry's vector processing"""
try:
if not request.gpu_acceleration:
logger.info("GPU acceleration disabled by request")
return g
logger.info("=== GPU ACCELERATION ATTEMPT ===")
logger.info(f"PyGraphistry object type: {type(g)}")
logger.info(f"Available methods: {[method for method in dir(g) if not method.startswith('_')]}")
# Check what GPU methods are actually available
has_compute_igraph = hasattr(g, 'compute_igraph')
has_umap = hasattr(g, 'umap')
logger.info(f"Has compute_igraph: {has_compute_igraph}")
logger.info(f"Has UMAP: {has_umap}")
gpu_operations_successful = 0
total_gpu_operations = 0
# Compute centrality measures if available
total_gpu_operations += 1
try:
if has_compute_igraph and len(g._nodes) < 50000: # Limit for performance
logger.info("Attempting PageRank computation...")
g = g.compute_igraph('pagerank', out_col='pagerank')
logger.info("✓ SUCCESS: Computed PageRank centrality with GPU")
gpu_operations_successful += 1
else:
reason = "too many nodes" if len(g._nodes) >= 50000 else "compute_igraph not available"
logger.warning(f"✗ SKIPPED: PageRank computation ({reason})")
except Exception as e:
logger.warning(f"✗ FAILED: PageRank computation failed: {e}")
# Apply UMAP for node positioning if available and beneficial
total_gpu_operations += 1
try:
if has_umap and len(g._nodes) > 100 and len(g._nodes) < 10000:
logger.info("Attempting UMAP for node positioning...")
g = g.umap()
logger.info("✓ SUCCESS: Applied UMAP for node positioning")
gpu_operations_successful += 1
else:
reason = ("UMAP not available" if not has_umap else
"too few nodes" if len(g._nodes) <= 100 else "too many nodes")
logger.warning(f"✗ SKIPPED: UMAP processing ({reason})")
except Exception as e:
logger.warning(f"✗ FAILED: UMAP processing failed: {e}")
logger.info(f"=== GPU ACCELERATION SUMMARY ===")
logger.info(f"GPU operations successful: {gpu_operations_successful}/{total_gpu_operations}")
logger.info(f"GPU utilization: {(gpu_operations_successful/total_gpu_operations)*100:.1f}%")
return g
except Exception as e:
logger.warning(f"GPU acceleration failed completely, falling back to CPU: {e}")
return g
async def _apply_clustering(self, g):
"""Apply GPU-accelerated clustering"""
try:
logger.info("=== CLUSTERING ATTEMPT ===")
# Use PyGraphistry's built-in clustering if available
if hasattr(g, 'compute_igraph') and len(g._nodes) < 20000: # Limit for performance
logger.info("Attempting Leiden community detection...")
try:
g = g.compute_igraph('community_leiden', out_col='cluster')
logger.info("✓ SUCCESS: Applied Leiden community detection")
return g
except Exception as e:
logger.warning(f"✗ FAILED: Leiden clustering failed: {e}")
logger.info("Attempting Louvain community detection as fallback...")
try:
g = g.compute_igraph('community_louvain', out_col='cluster')
logger.info("✓ SUCCESS: Applied Louvain community detection")
return g
except Exception as e2:
logger.warning(f"✗ FAILED: Louvain clustering also failed: {e2}")
else:
reason = "too many nodes" if len(g._nodes) >= 20000 else "compute_igraph not available"
logger.warning(f"✗ SKIPPED: Clustering ({reason})")
logger.info("=== CLUSTERING SUMMARY: No clustering applied ===")
return g
except Exception as e:
logger.warning(f"Clustering failed completely: {e}")
return g
async def _generate_layout(self, g, layout_type: str = "force"):
"""Generate layout using PyGraphistry's algorithms"""
try:
logger.info(f"Generating {layout_type} layout")
# Only apply layout computation for reasonable graph sizes
if len(g._nodes) > 50000:
logger.info("Skipping layout computation for very large graph")
return g
if hasattr(g, 'compute_igraph'):
try:
if layout_type == "force":
g = g.compute_igraph('layout_fruchterman_reingold', out_cols=['x', 'y'])
logger.info("Applied Fruchterman-Reingold force layout")
elif layout_type == "circular":
g = g.compute_igraph('layout_circle', out_cols=['x', 'y'])
logger.info("Applied circular layout")
elif layout_type == "hierarchical":
g = g.compute_igraph('layout_sugiyama', out_cols=['x', 'y'])
logger.info("Applied hierarchical layout")
else:
# Default to force-directed
g = g.compute_igraph('layout_fruchterman_reingold', out_cols=['x', 'y'])
logger.info("Applied default force layout")
except Exception as e:
logger.warning(f"Layout computation failed: {e}")
else:
logger.info("Layout computation not available, using default positioning")
return g
except Exception as e:
logger.warning(f"Layout generation failed: {e}")
return g
def _create_local_viz_data(self, g, processed_nodes: List[Dict], processed_edges: List[Dict]) -> Dict[str, Any]:
"""Create local visualization data when embed URL cannot be generated"""
try:
# Extract layout positions if available
positions = {}
if g._nodes is not None and 'x' in g._nodes.columns and 'y' in g._nodes.columns:
for _, row in g._nodes.iterrows():
node_id = row.get('id', row.name)
positions[str(node_id)] = {
'x': float(row['x']) if pd.notna(row['x']) else 0,
'y': float(row['y']) if pd.notna(row['y']) else 0
}
# Add cluster information if available
clusters = {}
if g._nodes is not None and 'cluster' in g._nodes.columns:
for _, row in g._nodes.iterrows():
node_id = row.get('id', row.name)
if pd.notna(row['cluster']):
clusters[str(node_id)] = int(row['cluster'])
# Create enhanced nodes with layout and cluster info
enhanced_nodes = []
for node in processed_nodes:
enhanced_node = node.copy()
node_id = str(node.get('id', ''))
if node_id in positions:
enhanced_node.update(positions[node_id])
if node_id in clusters:
enhanced_node['cluster'] = clusters[node_id]
enhanced_nodes.append(enhanced_node)
return {
"nodes": enhanced_nodes,
"edges": processed_edges,
"positions": positions,
"clusters": clusters,
"layout_computed": len(positions) > 0,
"clusters_computed": len(clusters) > 0
}
except Exception as e:
logger.error(f"Failed to create local visualization data: {e}")
return {
"nodes": processed_nodes,
"edges": processed_edges,
"positions": {},
"clusters": {},
"layout_computed": False,
"clusters_computed": False
}
async def get_graph_stats(self, graph_data: GraphData) -> Dict[str, Any]:
"""Get GPU-accelerated graph statistics"""
try:
nodes_df = pd.DataFrame(graph_data.nodes)
edges_df = pd.DataFrame(graph_data.links)
g = graphistry.edges(edges_df, 'source', 'target').nodes(nodes_df, 'id')
# Compute various graph metrics using GPU acceleration
stats = {
"node_count": len(nodes_df),
"edge_count": len(edges_df),
"density": len(edges_df) / (len(nodes_df) * (len(nodes_df) - 1)) if len(nodes_df) > 1 else 0,
"timestamp": datetime.now().isoformat()
}
# Add centrality measures if possible
try:
if len(nodes_df) < 10000 and hasattr(g, 'compute_igraph'): # Only for reasonably sized graphs
g_with_metrics = g.compute_igraph('pagerank', out_col='pagerank')
if g_with_metrics._nodes is not None and 'pagerank' in g_with_metrics._nodes.columns:
pagerank_data = g_with_metrics._nodes['pagerank'].to_list()
stats.update({
"avg_pagerank": float(np.mean(pagerank_data)),
"max_pagerank": float(np.max(pagerank_data))
})
logger.info("Computed PageRank statistics")
except Exception as e:
logger.warning(f"Could not compute centrality measures: {e}")
return stats
except Exception as e:
logger.error(f"Error computing graph stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
# FastAPI app
app = FastAPI(title="PyGraphistry GPU Visualization Service", version="1.0.0")
service = PyGraphistryService()
@app.post("/api/generate")
async def generate_graph(request: GraphGenerationRequest):
"""Start graph generation as background task"""
if request.num_nodes > 1000000:
raise HTTPException(status_code=400, detail="Maximum 1 million nodes allowed")
task_id = await service.start_graph_generation(request)
return {"task_id": task_id, "status": "started"}
@app.get("/api/generate/{task_id}")
async def get_generation_status(task_id: str):
"""Get status of graph generation task"""
status = service.get_generation_status(task_id)
if not status:
raise HTTPException(status_code=404, detail="Task not found")
return status
@app.post("/api/visualize")
async def visualize_graph(request: VisualizationRequest):
"""Process graph data with PyGraphistry GPU acceleration"""
return await service.process_graph_data(request)
@app.post("/api/stats")
async def get_graph_statistics(graph_data: GraphData):
"""Get GPU-accelerated graph statistics"""
return await service.get_graph_stats(graph_data)
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"pygraphistry_initialized": service.initialized,
"timestamp": datetime.now().isoformat()
}
@app.get("/api/patterns")
async def get_available_patterns():
"""Get list of available graph generation patterns"""
return {
"patterns": [
{
"name": pattern.value,
"description": {
GraphPattern.RANDOM: "Random graph using ErdősRényi model",
GraphPattern.SCALE_FREE: "Scale-free graph using BarabásiAlbert model",
GraphPattern.SMALL_WORLD: "Small-world graph using Watts-Strogatz model",
GraphPattern.CLUSTERED: "Clustered graph with community structure",
GraphPattern.HIERARCHICAL: "Hierarchical tree-like graph with cross-links",
GraphPattern.GRID: "2D or 3D grid graph"
}[pattern]
} for pattern in GraphPattern
]
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)

View File

@ -1,4 +1,3 @@
graphistry>=0.32.0
pandas>=2.0.0
numpy>=1.24.0
fastapi>=0.104.0
@ -7,7 +6,6 @@ pydantic>=2.0.0
networkx>=3.0 # For efficient graph generation algorithms
# cudf, cuml, cugraph are already included in PyG container
# cupy>=12.0.0 # Already included in PyG container
igraph>=0.10.0 # For additional graph algorithms
scikit-learn>=1.3.0 # For additional ML features
requests>=2.31.0
aiofiles>=23.0.0

View File

@ -2,8 +2,7 @@
"""
Unified GPU Graph Visualization Service
Combines PyGraphistry cloud processing and local GPU processing with cuGraph
into a single FastAPI service for maximum flexibility.
Provides local GPU processing with cuGraph and CPU fallback with NetworkX.
"""
import os
@ -23,9 +22,6 @@ from concurrent.futures import ThreadPoolExecutor
import networkx as nx
from enum import Enum
# PyGraphistry imports
import graphistry
# GPU-accelerated imports (available in NVIDIA PyG container)
try:
import cudf
@ -52,7 +48,6 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProcessingMode(str, Enum):
PYGRAPHISTRY_CLOUD = "pygraphistry_cloud"
LOCAL_GPU = "local_gpu"
LOCAL_CPU = "local_cpu"
@ -80,12 +75,7 @@ class GraphGenerationRequest(BaseModel):
class UnifiedVisualizationRequest(BaseModel):
graph_data: GraphData
processing_mode: ProcessingMode = ProcessingMode.PYGRAPHISTRY_CLOUD
# PyGraphistry Cloud options
layout_type: Optional[str] = "force"
gpu_acceleration: Optional[bool] = True
clustering: Optional[bool] = False
processing_mode: ProcessingMode = ProcessingMode.LOCAL_GPU
# Local GPU options
layout_algorithm: Optional[str] = "force_atlas2"
@ -100,8 +90,144 @@ class GraphGenerationStatus(BaseModel):
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
# Import graph generation classes (keeping existing code)
from pygraphistry_service import LargeGraphGenerator, init_graphistry
class LargeGraphGenerator:
"""Optimized graph generation using NetworkX and NumPy for performance"""
@staticmethod
def generate_random_graph(num_nodes: int, avg_degree: int = 5, seed: Optional[int] = None) -> GraphData:
"""Generate random graph using ErdősRényi model"""
if seed:
np.random.seed(seed)
# Calculate edge probability for desired average degree
p = avg_degree / (num_nodes - 1)
# Use NetworkX for efficient generation
G = nx.erdos_renyi_graph(num_nodes, p, seed=seed)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_scale_free_graph(num_nodes: int, m: int = 3, seed: Optional[int] = None) -> GraphData:
"""Generate scale-free graph using BarabásiAlbert model"""
G = nx.barabasi_albert_graph(num_nodes, m, seed=seed)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_small_world_graph(num_nodes: int, k: int = 6, p: float = 0.1, seed: Optional[int] = None) -> GraphData:
"""Generate small-world graph using Watts-Strogatz model"""
G = nx.watts_strogatz_graph(num_nodes, k, p, seed=seed)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_clustered_graph(num_nodes: int, num_clusters: int = 100, seed: Optional[int] = None) -> GraphData:
"""Generate clustered graph with intra and inter-cluster connections"""
if seed:
np.random.seed(seed)
cluster_size = num_nodes // num_clusters
G = nx.Graph()
# Add nodes with cluster information
for i in range(num_nodes):
cluster_id = i // cluster_size
G.add_node(i, cluster=cluster_id)
# Generate intra-cluster edges
intra_prob = 0.1
for cluster in range(num_clusters):
cluster_start = cluster * cluster_size
cluster_end = min(cluster_start + cluster_size, num_nodes)
cluster_nodes = list(range(cluster_start, cluster_end))
# Create subgraph for cluster
cluster_subgraph = nx.erdos_renyi_graph(len(cluster_nodes), intra_prob)
# Add edges to main graph with proper node mapping
for edge in cluster_subgraph.edges():
G.add_edge(cluster_nodes[edge[0]], cluster_nodes[edge[1]])
# Generate inter-cluster edges
inter_prob = 0.001
for i in range(num_nodes):
for j in range(i + 1, num_nodes):
if G.nodes[i].get('cluster') != G.nodes[j].get('cluster'):
if np.random.random() < inter_prob:
G.add_edge(i, j)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_hierarchical_graph(num_nodes: int, branching_factor: int = 3, seed: Optional[int] = None) -> GraphData:
"""Generate hierarchical (tree-like) graph"""
G = nx.random_tree(num_nodes, seed=seed)
# Add some cross-links to make it more interesting
if seed:
np.random.seed(seed)
# Add 10% additional edges for cross-connections
num_additional_edges = max(1, num_nodes // 10)
nodes = list(G.nodes())
for _ in range(num_additional_edges):
u, v = np.random.choice(nodes, 2, replace=False)
if not G.has_edge(u, v):
G.add_edge(u, v)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def generate_grid_graph(dimensions: List[int], seed: Optional[int] = None) -> GraphData:
"""Generate 2D or 3D grid graph"""
if len(dimensions) == 2:
G = nx.grid_2d_graph(dimensions[0], dimensions[1])
elif len(dimensions) == 3:
G = nx.grid_graph(dimensions)
else:
raise ValueError("Grid dimensions must be 2D or 3D")
# Convert coordinate tuples to integer node IDs
mapping = {node: i for i, node in enumerate(G.nodes())}
G = nx.relabel_nodes(G, mapping)
return LargeGraphGenerator._networkx_to_graphdata(G)
@staticmethod
def _networkx_to_graphdata(G: nx.Graph) -> GraphData:
"""Convert NetworkX graph to GraphData format"""
nodes = []
links = []
# Convert nodes
for node_id in G.nodes():
node_data = G.nodes[node_id]
node = {
"id": f"n{node_id}",
"name": f"Node {node_id}",
"val": np.random.randint(1, 11),
"degree": G.degree(node_id)
}
# Add cluster information if available
if 'cluster' in node_data:
node['group'] = f"cluster_{node_data['cluster']}"
else:
node['group'] = f"group_{node_id % 10}"
nodes.append(node)
# Convert edges
for edge in G.edges():
link = {
"source": f"n{edge[0]}",
"target": f"n{edge[1]}",
"name": f"link_{edge[0]}_{edge[1]}",
"weight": np.random.uniform(0.1, 5.0)
}
links.append(link)
return GraphData(nodes=nodes, links=links)
class LocalGPUProcessor:
"""GPU-accelerated graph processing using cuGraph"""
@ -207,109 +333,10 @@ class LocalGPUProcessor:
logger.error(f"GPU centrality computation failed: {e}")
return {}
class PyGraphistryProcessor:
"""PyGraphistry cloud processing (existing functionality)"""
def __init__(self):
self.initialized = init_graphistry()
async def process_graph_data(self, request: UnifiedVisualizationRequest) -> Dict[str, Any]:
"""Process graph data with PyGraphistry GPU acceleration"""
try:
if not self.initialized:
raise HTTPException(status_code=500, detail="PyGraphistry not initialized")
# Convert to pandas DataFrames for PyGraphistry
nodes_df = pd.DataFrame(request.graph_data.nodes)
edges_df = pd.DataFrame(request.graph_data.links)
# Ensure required columns exist
if 'id' not in nodes_df.columns:
nodes_df['id'] = nodes_df.index
if 'source' not in edges_df.columns or 'target' not in edges_df.columns:
raise HTTPException(status_code=400, detail="Links must have source and target columns")
logger.info(f"Processing graph with {len(nodes_df)} nodes and {len(edges_df)} edges")
# Create PyGraphistry graph object
g = graphistry.edges(edges_df, 'source', 'target').nodes(nodes_df, 'id')
# Apply GPU-accelerated processing
if request.gpu_acceleration:
g = await self._apply_gpu_acceleration(g, request)
# Apply clustering if requested
if request.clustering:
g = await self._apply_clustering(g)
# Generate layout
g = await self._generate_layout(g, request.layout_type)
# Extract processed data
processed_nodes = g._nodes.to_dict('records') if g._nodes is not None else nodes_df.to_dict('records')
processed_edges = g._edges.to_dict('records') if g._edges is not None else edges_df.to_dict('records')
# Generate embedding URL for interactive visualization
embed_url = None
local_viz_data = None
try:
embed_url = g.plot(render=False)
logger.info(f"Generated PyGraphistry embed URL: {embed_url}")
except Exception as e:
logger.warning(f"Could not generate embed URL (likely running in local mode): {e}")
# Create local visualization data as fallback
try:
local_viz_data = self._create_local_viz_data(g, processed_nodes, processed_edges)
logger.info("Generated local visualization data as fallback")
except Exception as viz_e:
logger.warning(f"Could not generate local visualization data: {viz_e}")
return {
"processed_nodes": processed_nodes,
"processed_edges": processed_edges,
"embed_url": embed_url,
"local_viz_data": local_viz_data,
"processing_mode": ProcessingMode.PYGRAPHISTRY_CLOUD,
"stats": {
"node_count": len(processed_nodes),
"edge_count": len(processed_edges),
"gpu_accelerated": request.gpu_acceleration,
"clustered": request.clustering,
"layout_type": request.layout_type,
"has_embed_url": embed_url is not None,
"has_local_viz": local_viz_data is not None
},
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error processing graph data: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ... (include other PyGraphistry methods from original service)
async def _apply_gpu_acceleration(self, g, request):
# Implementation from original service
pass
async def _apply_clustering(self, g):
# Implementation from original service
pass
async def _generate_layout(self, g, layout_type):
# Implementation from original service
pass
def _create_local_viz_data(self, g, processed_nodes, processed_edges):
# Implementation from original service
pass
class UnifiedGPUService:
"""Unified service offering both PyGraphistry cloud and local GPU processing"""
"""Unified service offering local GPU and CPU processing"""
def __init__(self):
self.pygraphistry_processor = PyGraphistryProcessor()
self.local_gpu_processor = LocalGPUProcessor()
self.generation_tasks = {}
self.executor = ThreadPoolExecutor(max_workers=4)
@ -318,12 +345,8 @@ class UnifiedGPUService:
async def process_graph(self, request: UnifiedVisualizationRequest) -> Dict[str, Any]:
"""Process graph with selected processing mode"""
if request.processing_mode == ProcessingMode.PYGRAPHISTRY_CLOUD:
return await self.pygraphistry_processor.process_graph_data(request)
elif request.processing_mode == ProcessingMode.LOCAL_GPU:
if request.processing_mode == ProcessingMode.LOCAL_GPU:
return await self._process_with_local_gpu(request)
else: # LOCAL_CPU
return await self._process_with_local_cpu(request)
@ -435,7 +458,7 @@ service = UnifiedGPUService()
@app.post("/api/visualize")
async def visualize_graph(request: UnifiedVisualizationRequest):
"""Process graph with unified service (supports all processing modes)"""
"""Process graph with unified service (supports local GPU and CPU modes)"""
result = await service.process_graph(request)
# Broadcast to connected WebSocket clients
@ -462,17 +485,13 @@ async def get_capabilities():
"""Get GPU capabilities and available processing modes"""
return {
"processing_modes": {
"pygraphistry_cloud": {
"available": service.pygraphistry_processor.initialized,
"description": "PyGraphistry cloud GPU processing with interactive embeds"
},
"local_gpu": {
"available": HAS_RAPIDS,
"description": "Local GPU processing with cuGraph/RAPIDS"
},
"local_cpu": {
"available": True,
"description": "Local CPU fallback processing"
"description": "Local CPU fallback processing with NetworkX"
}
},
"has_rapids": HAS_RAPIDS,
@ -524,7 +543,6 @@ async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"pygraphistry_initialized": service.pygraphistry_processor.initialized,
"local_gpu_available": HAS_RAPIDS,
"torch_geometric": HAS_TORCH_GEOMETRIC,
"timestamp": datetime.now().isoformat()
@ -555,7 +573,6 @@ async def get_visualization_page():
<div>
<label>Processing Mode:</label>
<select id="processingMode">
<option value="pygraphistry_cloud">PyGraphistry Cloud</option>
<option value="local_gpu">Local GPU (cuGraph)</option>
<option value="local_cpu">Local CPU</option>
</select>
@ -740,23 +757,8 @@ def startup_diagnostics():
else:
print("⚠ PyTorch Geometric not available")
# Check PyGraphistry credentials
print("Checking PyGraphistry credentials...")
personal_key = os.getenv('GRAPHISTRY_PERSONAL_KEY')
secret_key = os.getenv('GRAPHISTRY_SECRET_KEY')
api_key = os.getenv('GRAPHISTRY_API_KEY')
if personal_key and secret_key:
print("✓ PyGraphistry personal key/secret found")
elif api_key:
print("✓ PyGraphistry API key found")
else:
print("⚠ No PyGraphistry credentials found - cloud mode will be limited")
print(" Set GRAPHISTRY_PERSONAL_KEY + GRAPHISTRY_SECRET_KEY for full cloud features")
print("")
print("🎯 Available Processing Modes:")
print(" ☁️ PyGraphistry Cloud - Interactive GPU embeds (requires credentials)")
print(" 🚀 Local GPU (cuGraph) - Full local GPU processing")
print(" 💻 Local CPU - NetworkX fallback")
print("")
@ -770,4 +772,4 @@ def startup_diagnostics():
if __name__ == "__main__":
startup_diagnostics()
uvicorn.run(app, host="0.0.0.0", port=8080)
uvicorn.run(app, host="0.0.0.0", port=8080)