coyote/ssacli_tool/ssacli_tool.py

1040 lines
42 KiB
Python

#!/usr/bin/env python3
"""
SSACLI Python Automation Tool
Automatisiert SSACLI-Befehle und parst die Ausgaben für einfache Verwaltung von HP Smart Array Controllern
"""
import subprocess
import json
import yaml
import re
import argparse
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
class OutputFormat(Enum):
"""Verfügbare Ausgabeformate"""
TEXT = "text"
JSON = "json"
YAML = "yaml"
@dataclass
class PhysicalDrive:
"""Repräsentiert eine physische Festplatte"""
slot: str
status: str
size: str
interface: str
model: str
serial: str
temperature: Optional[str] = None
@dataclass
class LogicalDrive:
"""Repräsentiert ein logisches Laufwerk/Array"""
drive_id: str
size: str
raid_level: str
status: str
mount_point: Optional[str] = None
@dataclass
class Array:
"""Repräsentiert ein physisches Array"""
array_id: str
array_type: str
interface: str
size: str
unused_space: Optional[str] = None
status: str = "Unknown"
drive_count: int = 0
drives: List[str] = None
def __post_init__(self):
if self.drives is None:
self.drives = []
@dataclass
class UnassignedDrive:
"""Repräsentiert ein nicht zugewiesenes Laufwerk"""
slot: str
size: str
interface: str
model: str
serial: str
@dataclass
class Controller:
"""Repräsentiert einen Smart Array Controller"""
slot: str
model: str
serial: str
cache_status: str
battery_status: Optional[str] = None
unassigned_space: Optional[str] = None
class SSACLIManager:
"""Hauptklasse für SSACLI-Operationen"""
def __init__(self, ssacli_path: str = "ssacli"):
"""
Initialisiert den SSACLI Manager
Args:
ssacli_path: Pfad zur SSACLI-Executable (Standard: "ssacli")
"""
self.ssacli_path = ssacli_path
self.controllers = []
def run_command(self, command: str) -> str:
"""
Führt einen SSACLI-Befehl aus und gibt die Ausgabe zurück
Args:
command: SSACLI-Befehl ohne "ssacli" Präfix
Returns:
Befehlsausgabe als String
"""
full_command = f"{self.ssacli_path} {command}"
try:
result = subprocess.run(
full_command,
shell=True,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, full_command, result.stderr)
return result.stdout
except subprocess.TimeoutExpired:
raise Exception(f"Timeout beim Ausführen von: {full_command}")
except FileNotFoundError:
raise Exception(f"SSACLI nicht gefunden. Bitte Pfad überprüfen: {self.ssacli_path}")
def discover_controllers(self) -> List[Controller]:
"""
Entdeckt alle verfügbaren Smart Array Controller
Returns:
Liste der gefundenen Controller
"""
try:
output = self.run_command("ctrl all show")
controllers = []
# Parse Controller-Informationen
for line in output.split('\n'):
if 'Smart Array' in line and 'Slot' in line:
# Beispiel: "Smart Array P420i in Slot 0 (Embedded)"
slot_match = re.search(r'Slot (\d+)', line)
model_match = re.search(r'Smart Array ([^)]+)', line)
if slot_match and model_match:
slot = slot_match.group(1)
model = model_match.group(1).strip()
# Detaillierte Controller-Info abrufen
detail_output = self.run_command(f"ctrl slot={slot} show")
serial = self._extract_field(detail_output, "Serial Number")
cache_status = self._extract_field(detail_output, "Cache Status")
battery_status = self._extract_field(detail_output, "Battery/Capacitor Status")
controller = Controller(
slot=slot,
model=model,
serial=serial or "N/A",
cache_status=cache_status or "Unknown",
battery_status=battery_status,
unassigned_space=None # Wird später berechnet
)
controllers.append(controller)
self.controllers = controllers
return controllers
except Exception as e:
print(f"Fehler beim Entdecken der Controller: {e}")
return []
def get_physical_drives(self, controller_slot: str) -> List[PhysicalDrive]:
"""
Ruft alle physischen Laufwerke für einen Controller ab
Args:
controller_slot: Controller Slot-Nummer
Returns:
Liste der physischen Laufwerke
"""
try:
output = self.run_command(f"ctrl slot={controller_slot} pd all show")
drives = []
# Parse physische Laufwerke
for line in output.split('\n'):
if 'physicaldrive' in line.lower():
# Beispiel: "physicaldrive 1I:1:1 (port 1I:box 1:bay 1, SAS, 300 GB, OK)"
match = re.search(r'physicaldrive (\S+).*?\(([^,]+),\s*([^,]+),\s*([^,]+),\s*([^)]+)', line)
if match:
slot = match.group(1)
interface = match.group(3).strip()
size = match.group(4).strip()
status = match.group(5).strip()
# Detaillierte Laufwerk-Info
detail_output = self.run_command(f"ctrl slot={controller_slot} pd {slot} show")
model = self._extract_field(detail_output, "Model")
serial = self._extract_field(detail_output, "Serial Number")
temp = self._extract_field(detail_output, "Current Temperature")
drive = PhysicalDrive(
slot=slot,
status=status,
size=size,
interface=interface,
model=model or "Unknown",
serial=serial or "N/A",
temperature=temp
)
drives.append(drive)
return drives
except Exception as e:
print(f"Fehler beim Abrufen der physischen Laufwerke: {e}")
return []
def get_unassigned_drives(self, controller_slot: str):
try:
output = self.run_command(f"ctrl slot={controller_slot} pd allunassigned show")
drives = []
# Parse physische Laufwerke
for line in output.split('\n'):
if 'physicaldrive' in line.lower():
# Beispiel: "physicaldrive 1I:1:1 (port 1I:box 1:bay 1, SAS, 300 GB, OK)"
match = re.search(r'physicaldrive (\S+).*?\(([^,]+),\s*([^,]+),\s*([^,]+),\s*([^)]+)', line)
if match:
slot = match.group(1)
interface = match.group(3).strip()
size = match.group(4).strip()
status = match.group(5).strip()
# Detaillierte Laufwerk-Info
detail_output = self.run_command(f"ctrl slot={controller_slot} pd {slot} show")
model = self._extract_field(detail_output, "Model")
serial = self._extract_field(detail_output, "Serial Number")
temp = self._extract_field(detail_output, "Current Temperature")
drive = PhysicalDrive(
slot=slot,
status=status,
size=size,
interface=interface,
model=model or "Unknown",
serial=serial or "N/A",
temperature=temp
)
drives.append(drive)
return drives
except Exception as e:
print(f"Fehler beim Abrufen der unassigned Laufwerke: {e}")
return []
def get_logical_drives(self, controller_slot: str) -> List[LogicalDrive]:
"""
Ruft alle logischen Laufwerke für einen Controller ab
Args:
controller_slot: Controller Slot-Nummer
Returns:
Liste der logischen Laufwerke
"""
try:
output = self.run_command(f"ctrl slot={controller_slot} ld all show")
drives = []
# Parse logische Laufwerke
for line in output.split('\n'):
if 'logicaldrive' in line.lower():
# Beispiel: "logicaldrive 1 (279.4 GB, RAID 1, OK)"
match = re.search(r'logicaldrive (\d+) \(([^,]+),\s*([^,]+),\s*([^)]+)', line)
if match:
drive_id = match.group(1)
size = match.group(2).strip()
raid_level = match.group(3).strip()
status = match.group(4).strip()
drive = LogicalDrive(
drive_id=drive_id,
size=size,
raid_level=raid_level,
status=status
)
drives.append(drive)
return drives
except Exception as e:
print(f"Fehler beim Abrufen der logischen Laufwerke: {e}")
return []
def get_arrays(self, controller_slot: str) -> List[Array]:
"""
Ruft alle Arrays für einen Controller ab
Args:
controller_slot: Controller Slot-Nummer
Returns:
Liste der Arrays mit ungenutztem Speicher
"""
try:
output = self.run_command(f"ctrl slot={controller_slot} array all show")
arrays = []
# Parse Arrays
current_array = None
for line in output.split('\n'):
line = line.strip()
# Array Header erkennen
if line.startswith('array ') and '(' in line:
# Beispiel: "array A (SAS, Unused Space: 558.9 GB)"
match = re.search(r'array ([A-Z]+) \(([^,]+)(?:,\s*Unused Space:\s*([^)]+))?\)', line)
if match:
array_id = match.group(1)
interface = match.group(2).strip()
unused_space = match.group(3) if match.group(3) else "0 GB"
# Detaillierte Array-Info abrufen
try:
detail_output = self.run_command(f"ctrl slot={controller_slot} array {array_id} show")
# Array-Typ und Status extrahieren
array_type = self._extract_field(detail_output, "Array Type") or "Unknown"
status = self._extract_field(detail_output, "Status") or "Unknown"
# Array-Größe extrahieren
size_match = re.search(r'Array Size:\s*([^,\n]+)', detail_output)
size = size_match.group(1).strip() if size_match else "Unknown"
# Physische Laufwerke in diesem Array zählen
drive_count = len(re.findall(r'physicaldrive \d+[A-Z]:\d+:\d+', detail_output))
# Laufwerk-Slots extrahieren
drive_slots = re.findall(r'physicaldrive (\d+[A-Z]:\d+:\d+)', detail_output)
current_array = Array(
array_id=array_id,
array_type=array_type,
interface=interface,
size=size,
unused_space=unused_space,
status=status,
drive_count=drive_count,
drives=drive_slots
)
arrays.append(current_array)
except Exception as detail_error:
print(f"Warnung: Konnte Details für Array {array_id} nicht abrufen: {detail_error}")
# Fallback mit verfügbaren Informationen
current_array = Array(
array_id=array_id,
array_type="Unknown",
interface=interface,
size="Unknown",
unused_space=unused_space,
status="Unknown",
drive_count=0,
drives=[]
)
arrays.append(current_array)
return arrays
except Exception as e:
print(f"Fehler beim Abrufen der Arrays: {e}")
return []
def calculate_array_unused_space(self, controller_slot: str) -> Dict[str, Any]:
"""
Berechnet ungenutzten Speicherplatz in bestehenden Arrays
Args:
controller_slot: Controller Slot-Nummer
Returns:
Dictionary mit ungenutztem Array-Speicher und möglichen LD-Konfigurationen
"""
try:
arrays = self.get_arrays(controller_slot)
total_unused_gb = 0.0
array_details = []
possible_logical_drives = {}
for array in arrays:
unused_space_gb = 0.0
# Parse unused space
if array.unused_space and array.unused_space != "0 GB":
unused_match = re.search(r'(\d+(?:\.\d+)?)\s*(GB|TB)', array.unused_space, re.IGNORECASE)
if unused_match:
unused_value = float(unused_match.group(1))
unit = unused_match.group(2).upper()
if unit == "TB":
unused_value *= 1024
unused_space_gb = unused_value
total_unused_gb += unused_space_gb
array_info = {
"array_id": array.array_id,
"array_type": array.array_type,
"interface": array.interface,
"total_size": array.size,
"unused_space": array.unused_space,
"unused_space_gb": unused_space_gb,
"status": array.status,
"drive_count": array.drive_count,
"drives": array.drives
}
array_details.append(array_info)
# Mögliche logische Laufwerke für dieses Array
if unused_space_gb > 0:
array_key = f"Array {array.array_id} ({array.interface})"
possible_logical_drives[array_key] = {
"available_space_gb": unused_space_gb,
"available_space": f"{unused_space_gb:.1f} GB",
"suggested_sizes": []
}
# Vorschläge für logische Laufwerk-Größen
if unused_space_gb >= 1000: # 1TB+
possible_logical_drives[array_key]["suggested_sizes"].extend([
f"{unused_space_gb:.0f} GB (kompletter verfügbarer Platz)",
"500 GB",
"100 GB"
])
elif unused_space_gb >= 100: # 100GB+
possible_logical_drives[array_key]["suggested_sizes"].extend([
f"{unused_space_gb:.0f} GB (kompletter verfügbarer Platz)",
"50 GB",
"20 GB"
])
else: # Weniger als 100GB
possible_logical_drives[array_key]["suggested_sizes"].append(
f"{unused_space_gb:.1f} GB (kompletter verfügbarer Platz)"
)
return {
"total_unused_space_gb": total_unused_gb,
"total_unused_space": f"{total_unused_gb:.1f} GB",
"arrays_with_unused_space": len([a for a in array_details if a["unused_space_gb"] > 0]),
"array_details": array_details,
"possible_logical_drives": possible_logical_drives
}
except Exception as e:
print(f"Fehler beim Berechnen des ungenutzten Array-Speichers: {e}")
return {
"total_unused_space_gb": 0.0,
"total_unused_space": "0 GB",
"arrays_with_unused_space": 0,
"array_details": [],
"possible_logical_drives": {}
}
"""
Ruft alle nicht zugewiesenen physischen Laufwerke ab
Args:
controller_slot: Controller Slot-Nummer
Returns:
Liste der nicht zugewiesenen Laufwerke
"""
try:
output = self.run_command(f"ctrl slot={controller_slot} pd all show")
unassigned_drives = []
# Parse nicht zugewiesene Laufwerke
for line in output.split('\n'):
if 'physicaldrive' in line.lower() and ('unassigned' in line.lower() or 'ready' in line.lower()):
# Beispiel: "physicaldrive 1I:1:2 (port 1I:box 1:bay 2, SAS, 300 GB, OK, unassigned)"
match = re.search(r'physicaldrive (\S+).*?\(([^,]+),\s*([^,]+),\s*([^,]+),\s*([^,)]+)', line)
if match:
slot = match.group(1)
interface = match.group(3).strip()
size = match.group(4).strip()
# Detaillierte Info für nicht zugewiesene Laufwerke
detail_output = self.run_command(f"ctrl slot={controller_slot} pd {slot} show")
model = self._extract_field(detail_output, "Model")
serial = self._extract_field(detail_output, "Serial Number")
# Prüfen ob wirklich nicht zugewiesen
if "unassigned" in detail_output.lower() or "ready" in detail_output.lower():
drive = UnassignedDrive(
slot=slot,
size=size,
interface=interface,
model=model or "Unknown",
serial=serial or "N/A"
)
unassigned_drives.append(drive)
return unassigned_drives
except Exception as e:
print(f"Fehler beim Abrufen der nicht zugewiesenen Laufwerke: {e}")
return []
def calculate_available_space(self, controller_slot: str) -> Dict[str, Any]:
"""
Berechnet verfügbaren Speicherplatz für neue logische Laufwerke
Args:
controller_slot: Controller Slot-Nummer
Returns:
Dictionary mit verfügbarem Speicher und möglichen RAID-Konfigurationen
"""
try:
unassigned_drives = self.get_unassigned_drives(controller_slot)
if not unassigned_drives:
return {
"total_unassigned_drives": 0,
"available_space": "0 GB",
"possible_raids": {},
"drives": []
}
# Gruppiere Laufwerke nach Größe und Interface
drive_groups = {}
total_space_gb = 0
for drive in unassigned_drives:
# Extrahiere Größe in GB
size_match = re.search(r'(\d+(?:\.\d+)?)\s*(GB|TB)', drive.size, re.IGNORECASE)
if size_match:
size_value = float(size_match.group(1))
unit = size_match.group(2).upper()
if unit == "TB":
size_value *= 1024
key = f"{drive.size}_{drive.interface}"
if key not in drive_groups:
drive_groups[key] = {
"size": drive.size,
"size_gb": size_value,
"interface": drive.interface,
"count": 0,
"drives": []
}
drive_groups[key]["count"] += 1
drive_groups[key]["drives"].append(drive.slot)
total_space_gb += size_value
# Berechne mögliche RAID-Konfigurationen
possible_raids = {}
for group_key, group in drive_groups.items():
count = group["count"]
size_gb = group["size_gb"]
group_raids = {}
# RAID 0: Mindestens 2 Laufwerke
if count >= 2:
raid0_capacity = count * size_gb
group_raids["RAID 0"] = {
"drives_needed": count,
"capacity_gb": raid0_capacity,
"description": f"Alle {count} Laufwerke, {raid0_capacity:.1f} GB nutzbar"
}
# RAID 1: Genau 2 Laufwerke
if count >= 2:
pairs = count // 2
raid1_capacity = pairs * size_gb
group_raids["RAID 1"] = {
"drives_needed": pairs * 2,
"capacity_gb": raid1_capacity,
"description": f"{pairs} Paar(e), {raid1_capacity:.1f} GB nutzbar"
}
# RAID 5: Mindestens 3 Laufwerke
if count >= 3:
raid5_capacity = (count - 1) * size_gb
group_raids["RAID 5"] = {
"drives_needed": count,
"capacity_gb": raid5_capacity,
"description": f"{count} Laufwerke, {raid5_capacity:.1f} GB nutzbar"
}
# RAID 6: Mindestens 4 Laufwerke
if count >= 4:
raid6_capacity = (count - 2) * size_gb
group_raids["RAID 6"] = {
"drives_needed": count,
"capacity_gb": raid6_capacity,
"description": f"{count} Laufwerke, {raid6_capacity:.1f} GB nutzbar"
}
# RAID 10: Mindestens 4 Laufwerke, gerade Anzahl
if count >= 4 and count % 2 == 0:
raid10_capacity = (count // 2) * size_gb
group_raids["RAID 10"] = {
"drives_needed": count,
"capacity_gb": raid10_capacity,
"description": f"{count} Laufwerke, {raid10_capacity:.1f} GB nutzbar"
}
if group_raids:
possible_raids[f"{group['size']} {group['interface']} ({count}x)"] = group_raids
return {
"total_unassigned_drives": len(unassigned_drives),
"available_space": f"{total_space_gb:.1f} GB",
"possible_raids": possible_raids,
"drives": [
{
"slot": drive.slot,
"size": drive.size,
"interface": drive.interface,
"model": drive.model,
"serial": drive.serial
}
for drive in unassigned_drives
],
"drive_groups": drive_groups
}
except Exception as e:
print(f"Fehler beim Berechnen des verfügbaren Speichers: {e}")
return {
"total_unassigned_drives": 0,
"available_space": "0 GB",
"possible_raids": {},
"drives": []
}
"""Hilfsfunktion zum Extrahieren von Feldern aus SSACLI-Ausgabe"""
pattern = f"{field_name}\\s*:\\s*(.+)"
match = re.search(pattern, output, re.IGNORECASE)
return match.group(1).strip() if match else None
def create_raid_array(self, controller_slot: str, raid_level: str, drives: List[str]) -> bool:
"""
Erstellt ein neues RAID-Array
Args:
controller_slot: Controller Slot-Nummer
raid_level: RAID-Level (0, 1, 5, 6, 10)
drives: Liste der physischen Laufwerk-Slots
Returns:
True bei Erfolg, False bei Fehler
"""
try:
drives_str = ",".join(drives)
command = f"ctrl slot={controller_slot} create type=ld drives={drives_str} raid={raid_level}"
output = self.run_command(command)
return "Success" in output or "OK" in output
except Exception as e:
print(f"Fehler beim Erstellen des RAID-Arrays: {e}")
return False
def generate_report(self) -> Dict[str, Any]:
"""
Generiert einen umfassenden Bericht über alle Controller und Laufwerke
Returns:
Dictionary mit allen System-Informationen
"""
report = {
"timestamp": datetime.now().isoformat(),
"controllers": []
}
controllers = self.discover_controllers()
for controller in controllers:
controller_data = {
"controller": {
"slot": controller.slot,
"model": controller.model,
"serial": controller.serial,
"cache_status": controller.cache_status,
"battery_status": controller.battery_status
},
"physical_drives": [],
"logical_drives": []
}
# Physische Laufwerke
physical_drives = self.get_physical_drives(controller.slot)
for pd in physical_drives:
controller_data["physical_drives"].append({
"slot": pd.slot,
"status": pd.status,
"size": pd.size,
"interface": pd.interface,
"model": pd.model,
"serial": pd.serial,
"temperature": pd.temperature
})
# Logische Laufwerke
logical_drives = self.get_logical_drives(controller.slot)
for ld in logical_drives:
controller_data["logical_drives"].append({
"drive_id": ld.drive_id,
"size": ld.size,
"raid_level": ld.raid_level,
"status": ld.status
})
# Verfügbarer Speicher und mögliche RAIDs
available_space_info = self.calculate_available_space(controller.slot)
controller_data["available_space"] = available_space_info
# Ungenutzter Array-Speicher
array_unused_info = self.calculate_array_unused_space(controller.slot)
controller_data["array_unused_space"] = array_unused_info
report["controllers"].append(controller_data)
return report
def print_summary(self, output_format: OutputFormat = OutputFormat.TEXT):
"""
Gibt eine Zusammenfassung aller Systeminformationen aus
Args:
output_format: Gewünschtes Ausgabeformat (TEXT, JSON, YAML)
"""
if output_format == OutputFormat.TEXT:
self._print_text_summary()
elif output_format == OutputFormat.JSON:
self._print_json_summary()
elif output_format == OutputFormat.YAML:
self._print_yaml_summary()
else:
raise ValueError(f"Unbekanntes Ausgabeformat: {output_format}")
def _extract_field(self, detail_output, field):
"""
serial = self._extract_field(detail_output, "Serial Number")
Serial Number: PZXND0BRHH68OI
"""
try:
for line in iter(detail_output.splitlines()):
line = line.strip()
line = line.split(':')
if line[0] == field:
return(line[1])
except:
return('')
def _print_text_summary(self):
"""Druckt eine übersichtliche Text-Zusammenfassung für die Kommandozeile"""
print("=" * 80)
print("HP Smart Array Controller Zusammenfassung")
print("=" * 80)
controllers = self.discover_controllers()
if not controllers:
print("Keine Smart Array Controller gefunden!")
return
for controller in controllers:
print(f"\n📊 Controller Slot {controller.slot}: {controller.model}")
print(f" Serial: {controller.serial}")
print(f" Cache Status: {controller.cache_status}")
if controller.battery_status:
print(f" Battery Status: {controller.battery_status}")
# Physische Laufwerke
physical_drives = self.get_physical_drives(controller.slot)
print(f"\n💾 Physische Laufwerke ({len(physical_drives)}):")
for pd in physical_drives:
status_emoji = "" if pd.status == "OK" else ""
temp_info = f" ({pd.temperature})" if pd.temperature else ""
print(f" {status_emoji} {pd.slot}: {pd.size} {pd.interface} - {pd.status}{temp_info}")
# Logische Laufwerke
logical_drives = self.get_logical_drives(controller.slot)
print(f"\n🔗 Logische Laufwerke ({len(logical_drives)}):")
for ld in logical_drives:
status_emoji = "" if ld.status == "OK" else ""
print(f" {status_emoji} LD {ld.drive_id}: {ld.size} {ld.raid_level} - {ld.status}")
# Verfügbarer Speicher
available_info = self.calculate_available_space(controller.slot)
unassigned_count = available_info["total_unassigned_drives"]
if unassigned_count > 0:
print(f"\n💾 Nicht zugewiesener Speicher:")
print(f" 📦 {unassigned_count} Laufwerk(e) verfügbar ({available_info['available_space']})")
# Zeige verfügbare Laufwerke
for drive in available_info["drives"]:
print(f" 🔸 {drive['slot']}: {drive['size']} {drive['interface']} - {drive['model']}")
# Zeige mögliche RAID-Konfigurationen
if available_info["possible_raids"]:
print(f"\n🛠️ Mögliche RAID-Konfigurationen:")
for group_name, raids in available_info["possible_raids"].items():
print(f" 📋 {group_name}:")
for raid_type, raid_info in raids.items():
print(f"{raid_type}: {raid_info['description']}")
# Ungenutzter Array-Speicher
array_unused_info = self.calculate_array_unused_space(controller.slot)
total_unused = array_unused_info["total_unused_space_gb"]
arrays_with_unused = array_unused_info["arrays_with_unused_space"]
if total_unused > 0:
print(f"\n🔧 Ungenutzter Array-Speicher:")
print(f" 📊 {arrays_with_unused} Array(s) mit ungenutztem Platz ({array_unused_info['total_unused_space']})")
for array_info in array_unused_info["array_details"]:
if array_info["unused_space_gb"] > 0:
print(f" 🔸 Array {array_info['array_id']}: {array_info['unused_space']} verfügbar")
print(f" └─ {array_info['interface']}, {array_info['drive_count']} Laufwerke, Status: {array_info['status']}")
# Mögliche logische Laufwerke
if array_unused_info["possible_logical_drives"]:
print(f"\n💿 Mögliche neue logische Laufwerke:")
for array_name, ld_info in array_unused_info["possible_logical_drives"].items():
print(f" 📋 {array_name} ({ld_info['available_space']} verfügbar):")
for size in ld_info["suggested_sizes"]:
print(f"{size}")
if unassigned_count == 0 and total_unused == 0:
print(f"\n💾 Speicher-Status: Alle verfügbaren Laufwerke sind vollständig genutzt ✅")
def _print_json_summary(self):
"""Gibt eine JSON-formatierte Zusammenfassung aus"""
report = self.generate_report()
print(json.dumps(report, indent=2, ensure_ascii=False))
def _print_yaml_summary(self):
"""Gibt eine YAML-formatierte Zusammenfassung aus"""
report = self.generate_report()
print(yaml.dump(report, default_flow_style=False, allow_unicode=True, sort_keys=False))
def export_report(self, filename: str, output_format: OutputFormat):
"""
Exportiert einen detaillierten Bericht in eine Datei
Args:
filename: Zieldateiname
output_format: Gewünschtes Ausgabeformat
"""
report = self.generate_report()
try:
with open(filename, 'w', encoding='utf-8') as f:
if output_format == OutputFormat.JSON:
json.dump(report, f, indent=2, ensure_ascii=False)
elif output_format == OutputFormat.YAML:
yaml.dump(report, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
elif output_format == OutputFormat.TEXT:
# Text-Export als formatierte Ausgabe
f.write("=" * 80 + "\n")
f.write("HP Smart Array Controller Bericht\n")
f.write(f"Generiert: {report['timestamp']}\n")
f.write("=" * 80 + "\n\n")
for controller_data in report["controllers"]:
controller = controller_data["controller"]
f.write(f"Controller Slot {controller['slot']}: {controller['model']}\n")
f.write(f" Serial: {controller['serial']}\n")
f.write(f" Cache Status: {controller['cache_status']}\n")
if controller.get('battery_status'):
f.write(f" Battery Status: {controller['battery_status']}\n")
f.write(f"\n Physische Laufwerke ({len(controller_data['physical_drives'])}):\n")
for pd in controller_data["physical_drives"]:
f.write(f" {pd['slot']}: {pd['size']} {pd['interface']} - {pd['status']}\n")
f.write(f"\n Logische Laufwerke ({len(controller_data['logical_drives'])}):\n")
for ld in controller_data["logical_drives"]:
f.write(f" LD {ld['drive_id']}: {ld['size']} {ld['raid_level']} - {ld['status']}\n")
# Verfügbarer Speicher
available = controller_data.get("available_space", {})
if available.get("total_unassigned_drives", 0) > 0:
f.write(f"\n Nicht zugewiesener Speicher: {available['available_space']}\n")
for drive in available.get("drives", []):
f.write(f" {drive['slot']}: {drive['size']} {drive['interface']}\n")
# Array-Speicher
array_unused = controller_data.get("array_unused_space", {})
if array_unused.get("total_unused_space_gb", 0) > 0:
f.write(f"\n Ungenutzter Array-Speicher: {array_unused['total_unused_space']}\n")
for array_info in array_unused.get("array_details", []):
if array_info.get("unused_space_gb", 0) > 0:
f.write(f" Array {array_info['array_id']}: {array_info['unused_space']}\n")
f.write("\n" + "-" * 60 + "\n")
print(f"✅ Bericht exportiert: {filename}")
except Exception as e:
print(f"❌ Fehler beim Exportieren: {e}")
def main():
"""Hauptfunktion mit Kommandozeilen-Interface"""
parser = argparse.ArgumentParser(
description="SSACLI Python Automation Tool - Verwaltung von HP Smart Array Controllern",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Beispiele:
python3 ssacli_tool.py # Standard Text-Ausgabe
python3 ssacli_tool.py --format json # JSON-Ausgabe
python3 ssacli_tool.py --format yaml # YAML-Ausgabe
python3 ssacli_tool.py --export report.json # JSON-Export in Datei
python3 ssacli_tool.py --export report.yaml --format yaml # YAML-Export
python3 ssacli_tool.py --ssacli-path /usr/sbin/ssacli # Alternativer SSACLI-Pfad
Hinweis: Das Script muss als Administrator/Root ausgeführt werden.
"""
)
parser.add_argument(
'--format', '-f',
type=str,
choices=['text', 'json', 'yaml'],
default='text',
help='Ausgabeformat (Standard: text)'
)
parser.add_argument(
'--export', '-e',
type=str,
help='Exportiere Bericht in angegebene Datei'
)
parser.add_argument(
'--ssacli-path',
type=str,
default='ssacli',
help='Pfad zur SSACLI-Executable (Standard: ssacli)'
)
parser.add_argument(
'--quiet', '-q',
action='store_true',
help='Unterdrücke Statusmeldungen (nur Ausgabe/Export)'
)
parser.add_argument(
'--version', '-v',
action='version',
version='SSACLI Python Tool v2.0'
)
args = parser.parse_args()
# Ausgabeformat bestimmen
try:
output_format = OutputFormat(args.format)
except ValueError:
print(f"❌ Unbekanntes Format: {args.format}")
return 1
if not args.quiet:
print("🚀 SSACLI Python Automation Tool v2.0")
if args.format != 'text':
print(f"📋 Ausgabeformat: {args.format.upper()}")
print("-" * 50)
# SSACLI Manager initialisieren
ssacli = SSACLIManager(args.ssacli_path)
try:
# Export in Datei
if args.export:
if not args.quiet:
print(f"📤 Exportiere Bericht nach {args.export}...")
ssacli.export_report(args.export, output_format)
if not args.quiet:
print("✅ Export erfolgreich abgeschlossen")
# Ausgabe auf Konsole
else:
if not args.quiet and args.format == 'text':
print()
ssacli.print_summary(output_format)
return 0
except KeyboardInterrupt:
if not args.quiet:
print("\n⚠️ Abgebrochen durch Benutzer")
return 130
except Exception as e:
print(f"❌ Fehler: {e}")
if not args.quiet:
print("\nHinweise:")
print("- Stellen Sie sicher, dass SSACLI installiert ist")
print("- Script als Administrator/Root ausführen")
print("- Smart Array Controller müssen vorhanden sein")
print(f"- SSACLI-Pfad prüfen: {args.ssacli_path}")
return 1
# Legacy-Funktion für Rückwärtskompatibilität
def legacy_main():
"""Legacy Hauptfunktion für alte Verwendung"""
print("🚀 SSACLI Python Automation Tool")
print("-" * 40)
# SSACLI Manager initialisieren
ssacli = SSACLIManager()
try:
# Übersicht anzeigen
ssacli.print_summary()
# Detaillierten JSON-Report generieren
print("\n📋 Generiere detaillierten Report...")
report = ssacli.generate_report()
# Report in Datei speichern
report_filename = f"ssacli_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"✅ Report gespeichert: {report_filename}")
except Exception as e:
print(f"❌ Fehler: {e}")
print("\nHinweis: Stellen Sie sicher, dass:")
print("- SSACLI installiert ist")
print("- Das Script als Administrator/Root ausgeführt wird")
print("- Smart Array Controller vorhanden sind")
if __name__ == "__main__":
main()