235 lines
8.9 KiB
Python
235 lines
8.9 KiB
Python
"""Kubernetes API client wrapper."""
|
|
|
|
from typing import List, Dict, Any, Optional
|
|
from kubernetes import client, config
|
|
from kubernetes.client.exceptions import ApiException
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from datetime import datetime
|
|
|
|
console = Console()
|
|
|
|
|
|
class K8sClient:
|
|
"""Wrapper for Kubernetes API operations."""
|
|
|
|
def __init__(self):
|
|
"""Initialize Kubernetes client."""
|
|
try:
|
|
config.load_kube_config()
|
|
self.v1 = client.CoreV1Api()
|
|
self.apps_v1 = client.AppsV1Api()
|
|
console.print("[green]✓[/green] Connected to Kubernetes cluster")
|
|
except Exception as e:
|
|
console.print(f"[red]✗[/red] Failed to connect to Kubernetes: {e}")
|
|
raise
|
|
|
|
def get_namespaces(self) -> List[str]:
|
|
"""Get list of all namespaces."""
|
|
try:
|
|
namespaces = self.v1.list_namespace()
|
|
return sorted([ns.metadata.name for ns in namespaces.items])
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching namespaces:[/red] {e}")
|
|
return []
|
|
|
|
def get_deployments(self, namespace: str) -> List[Dict[str, Any]]:
|
|
"""Get list of deployments in namespace."""
|
|
try:
|
|
deployments = self.apps_v1.list_namespaced_deployment(namespace)
|
|
return [{
|
|
'name': dep.metadata.name,
|
|
'replicas': dep.spec.replicas,
|
|
'available': dep.status.available_replicas or 0,
|
|
'ready': dep.status.ready_replicas or 0,
|
|
} for dep in deployments.items]
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching deployments:[/red] {e}")
|
|
return []
|
|
|
|
def get_pods(self, namespace: str, label_selector: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Get list of pods in namespace."""
|
|
try:
|
|
if label_selector:
|
|
pods = self.v1.list_namespaced_pod(namespace, label_selector=label_selector)
|
|
else:
|
|
pods = self.v1.list_namespaced_pod(namespace)
|
|
|
|
return [{
|
|
'name': pod.metadata.name,
|
|
'status': pod.status.phase,
|
|
'ready': sum(1 for c in pod.status.container_statuses or [] if c.ready),
|
|
'total_containers': len(pod.spec.containers),
|
|
'restarts': sum(c.restart_count for c in pod.status.container_statuses or []),
|
|
'age': self._calculate_age(pod.metadata.creation_timestamp),
|
|
} for pod in pods.items]
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching pods:[/red] {e}")
|
|
return []
|
|
|
|
def get_configmaps(self, namespace: str) -> List[Dict[str, Any]]:
|
|
"""Get list of ConfigMaps in namespace."""
|
|
try:
|
|
configmaps = self.v1.list_namespaced_config_map(namespace)
|
|
return [{
|
|
'name': cm.metadata.name,
|
|
'data_keys': list(cm.data.keys()) if cm.data else [],
|
|
'age': self._calculate_age(cm.metadata.creation_timestamp),
|
|
} for cm in configmaps.items]
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching ConfigMaps:[/red] {e}")
|
|
return []
|
|
|
|
def get_configmap_data(self, namespace: str, name: str) -> Optional[Dict[str, str]]:
|
|
"""Get ConfigMap data."""
|
|
try:
|
|
cm = self.v1.read_namespaced_config_map(name, namespace)
|
|
return cm.data
|
|
except ApiException as e:
|
|
console.print(f"[red]Error reading ConfigMap:[/red] {e}")
|
|
return None
|
|
|
|
def update_configmap(self, namespace: str, name: str, data: Dict[str, str]) -> bool:
|
|
"""Update ConfigMap data."""
|
|
try:
|
|
# Read the existing ConfigMap first
|
|
cm = self.v1.read_namespaced_config_map(name, namespace)
|
|
# Update only the data field
|
|
cm.data = data
|
|
# Patch the ConfigMap
|
|
self.v1.patch_namespaced_config_map(name, namespace, cm)
|
|
console.print(f"[green]✓[/green] ConfigMap {name} updated successfully")
|
|
return True
|
|
except ApiException as e:
|
|
console.print(f"[red]Error updating ConfigMap:[/red] {e}")
|
|
return False
|
|
|
|
def restart_deployment(self, namespace: str, name: str) -> bool:
|
|
"""Restart deployment by updating annotation."""
|
|
try:
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
body = {
|
|
'spec': {
|
|
'template': {
|
|
'metadata': {
|
|
'annotations': {
|
|
'kubectl.kubernetes.io/restartedAt': now
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
self.apps_v1.patch_namespaced_deployment(name, namespace, body)
|
|
console.print(f"[green]✓[/green] Deployment {name} restarted")
|
|
return True
|
|
except ApiException as e:
|
|
console.print(f"[red]Error restarting deployment:[/red] {e}")
|
|
return False
|
|
|
|
def scale_deployment(self, namespace: str, name: str, replicas: int) -> bool:
|
|
"""Scale deployment to specified number of replicas."""
|
|
try:
|
|
body = {'spec': {'replicas': replicas}}
|
|
self.apps_v1.patch_namespaced_deployment_scale(name, namespace, body)
|
|
console.print(f"[green]✓[/green] Deployment {name} scaled to {replicas} replicas")
|
|
return True
|
|
except ApiException as e:
|
|
console.print(f"[red]Error scaling deployment:[/red] {e}")
|
|
return False
|
|
|
|
def get_pod_logs(self, namespace: str, pod_name: str, container: Optional[str] = None,
|
|
tail_lines: int = 100) -> Optional[str]:
|
|
"""Get logs from pod."""
|
|
try:
|
|
kwargs = {'name': pod_name, 'namespace': namespace, 'tail_lines': tail_lines}
|
|
if container:
|
|
kwargs['container'] = container
|
|
|
|
logs = self.v1.read_namespaced_pod_log(**kwargs)
|
|
return logs
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching logs:[/red] {e}")
|
|
return None
|
|
|
|
def get_pod_containers(self, namespace: str, pod_name: str) -> List[str]:
|
|
"""Get list of containers in pod."""
|
|
try:
|
|
pod = self.v1.read_namespaced_pod(pod_name, namespace)
|
|
return [container.name for container in pod.spec.containers]
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching pod info:[/red] {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def _calculate_age(timestamp: datetime) -> str:
|
|
"""Calculate age from timestamp."""
|
|
if not timestamp:
|
|
return "Unknown"
|
|
|
|
now = datetime.now(timestamp.tzinfo)
|
|
delta = now - timestamp
|
|
|
|
if delta.days > 0:
|
|
return f"{delta.days}d"
|
|
elif delta.seconds >= 3600:
|
|
return f"{delta.seconds // 3600}h"
|
|
elif delta.seconds >= 60:
|
|
return f"{delta.seconds // 60}m"
|
|
else:
|
|
return f"{delta.seconds}s"
|
|
|
|
def display_deployments_table(self, deployments: List[Dict[str, Any]]):
|
|
"""Display deployments in a table."""
|
|
table = Table(title="Deployments")
|
|
table.add_column("Name", style="cyan")
|
|
table.add_column("Replicas", style="magenta")
|
|
table.add_column("Available", style="green")
|
|
table.add_column("Ready", style="yellow")
|
|
|
|
for dep in deployments:
|
|
table.add_row(
|
|
dep['name'],
|
|
str(dep['replicas']),
|
|
str(dep['available']),
|
|
str(dep['ready'])
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
def display_pods_table(self, pods: List[Dict[str, Any]]):
|
|
"""Display pods in a table."""
|
|
table = Table(title="Pods")
|
|
table.add_column("Name", style="cyan")
|
|
table.add_column("Status", style="magenta")
|
|
table.add_column("Ready", style="green")
|
|
table.add_column("Restarts", style="yellow")
|
|
table.add_column("Age", style="blue")
|
|
|
|
for pod in pods:
|
|
status_color = "green" if pod['status'] == "Running" else "red"
|
|
table.add_row(
|
|
pod['name'],
|
|
f"[{status_color}]{pod['status']}[/{status_color}]",
|
|
f"{pod['ready']}/{pod['total_containers']}",
|
|
str(pod['restarts']),
|
|
pod['age']
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
def display_configmaps_table(self, configmaps: List[Dict[str, Any]]):
|
|
"""Display ConfigMaps in a table."""
|
|
table = Table(title="ConfigMaps")
|
|
table.add_column("Name", style="cyan")
|
|
table.add_column("Keys", style="magenta")
|
|
table.add_column("Age", style="blue")
|
|
|
|
for cm in configmaps:
|
|
table.add_row(
|
|
cm['name'],
|
|
", ".join(cm['data_keys']) if cm['data_keys'] else "None",
|
|
cm['age']
|
|
)
|
|
|
|
console.print(table)
|