1040 lines
42 KiB
Python
1040 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SSACLI Python Automation Tool
|
|
Automatisiert SSACLI-Befehle und parst die Ausgaben für einfache Verwaltung von HP Smart Array Controllern
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import yaml
|
|
import re
|
|
import argparse
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
|
|
class OutputFormat(Enum):
|
|
"""Verfügbare Ausgabeformate"""
|
|
TEXT = "text"
|
|
JSON = "json"
|
|
YAML = "yaml"
|
|
|
|
@dataclass
|
|
class PhysicalDrive:
|
|
"""Repräsentiert eine physische Festplatte"""
|
|
slot: str
|
|
status: str
|
|
size: str
|
|
interface: str
|
|
model: str
|
|
serial: str
|
|
temperature: Optional[str] = None
|
|
|
|
@dataclass
|
|
class LogicalDrive:
|
|
"""Repräsentiert ein logisches Laufwerk/Array"""
|
|
drive_id: str
|
|
size: str
|
|
raid_level: str
|
|
status: str
|
|
mount_point: Optional[str] = None
|
|
|
|
@dataclass
|
|
class Array:
|
|
"""Repräsentiert ein physisches Array"""
|
|
array_id: str
|
|
array_type: str
|
|
interface: str
|
|
size: str
|
|
unused_space: Optional[str] = None
|
|
status: str = "Unknown"
|
|
drive_count: int = 0
|
|
drives: List[str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.drives is None:
|
|
self.drives = []
|
|
|
|
@dataclass
|
|
class UnassignedDrive:
|
|
"""Repräsentiert ein nicht zugewiesenes Laufwerk"""
|
|
slot: str
|
|
size: str
|
|
interface: str
|
|
model: str
|
|
serial: str
|
|
|
|
@dataclass
|
|
class Controller:
|
|
"""Repräsentiert einen Smart Array Controller"""
|
|
slot: str
|
|
model: str
|
|
serial: str
|
|
cache_status: str
|
|
battery_status: Optional[str] = None
|
|
unassigned_space: Optional[str] = None
|
|
|
|
class SSACLIManager:
|
|
"""Hauptklasse für SSACLI-Operationen"""
|
|
|
|
def __init__(self, ssacli_path: str = "ssacli"):
|
|
"""
|
|
Initialisiert den SSACLI Manager
|
|
|
|
Args:
|
|
ssacli_path: Pfad zur SSACLI-Executable (Standard: "ssacli")
|
|
"""
|
|
self.ssacli_path = ssacli_path
|
|
self.controllers = []
|
|
|
|
def run_command(self, command: str) -> str:
|
|
"""
|
|
Führt einen SSACLI-Befehl aus und gibt die Ausgabe zurück
|
|
|
|
Args:
|
|
command: SSACLI-Befehl ohne "ssacli" Präfix
|
|
|
|
Returns:
|
|
Befehlsausgabe als String
|
|
"""
|
|
full_command = f"{self.ssacli_path} {command}"
|
|
try:
|
|
result = subprocess.run(
|
|
full_command,
|
|
shell=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
if result.returncode != 0:
|
|
raise subprocess.CalledProcessError(result.returncode, full_command, result.stderr)
|
|
return result.stdout
|
|
except subprocess.TimeoutExpired:
|
|
raise Exception(f"Timeout beim Ausführen von: {full_command}")
|
|
except FileNotFoundError:
|
|
raise Exception(f"SSACLI nicht gefunden. Bitte Pfad überprüfen: {self.ssacli_path}")
|
|
|
|
def discover_controllers(self) -> List[Controller]:
|
|
"""
|
|
Entdeckt alle verfügbaren Smart Array Controller
|
|
|
|
Returns:
|
|
Liste der gefundenen Controller
|
|
"""
|
|
try:
|
|
output = self.run_command("ctrl all show")
|
|
controllers = []
|
|
|
|
# Parse Controller-Informationen
|
|
for line in output.split('\n'):
|
|
if 'Smart Array' in line and 'Slot' in line:
|
|
# Beispiel: "Smart Array P420i in Slot 0 (Embedded)"
|
|
slot_match = re.search(r'Slot (\d+)', line)
|
|
model_match = re.search(r'Smart Array ([^)]+)', line)
|
|
|
|
if slot_match and model_match:
|
|
slot = slot_match.group(1)
|
|
model = model_match.group(1).strip()
|
|
|
|
# Detaillierte Controller-Info abrufen
|
|
detail_output = self.run_command(f"ctrl slot={slot} show")
|
|
serial = self._extract_field(detail_output, "Serial Number")
|
|
cache_status = self._extract_field(detail_output, "Cache Status")
|
|
battery_status = self._extract_field(detail_output, "Battery/Capacitor Status")
|
|
|
|
controller = Controller(
|
|
slot=slot,
|
|
model=model,
|
|
serial=serial or "N/A",
|
|
cache_status=cache_status or "Unknown",
|
|
battery_status=battery_status,
|
|
unassigned_space=None # Wird später berechnet
|
|
)
|
|
controllers.append(controller)
|
|
|
|
self.controllers = controllers
|
|
return controllers
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Entdecken der Controller: {e}")
|
|
return []
|
|
|
|
def get_physical_drives(self, controller_slot: str) -> List[PhysicalDrive]:
|
|
"""
|
|
Ruft alle physischen Laufwerke für einen Controller ab
|
|
|
|
Args:
|
|
controller_slot: Controller Slot-Nummer
|
|
|
|
Returns:
|
|
Liste der physischen Laufwerke
|
|
"""
|
|
try:
|
|
output = self.run_command(f"ctrl slot={controller_slot} pd all show")
|
|
drives = []
|
|
|
|
# Parse physische Laufwerke
|
|
for line in output.split('\n'):
|
|
if 'physicaldrive' in line.lower():
|
|
# Beispiel: "physicaldrive 1I:1:1 (port 1I:box 1:bay 1, SAS, 300 GB, OK)"
|
|
match = re.search(r'physicaldrive (\S+).*?\(([^,]+),\s*([^,]+),\s*([^,]+),\s*([^)]+)', line)
|
|
if match:
|
|
slot = match.group(1)
|
|
interface = match.group(3).strip()
|
|
size = match.group(4).strip()
|
|
status = match.group(5).strip()
|
|
|
|
# Detaillierte Laufwerk-Info
|
|
detail_output = self.run_command(f"ctrl slot={controller_slot} pd {slot} show")
|
|
model = self._extract_field(detail_output, "Model")
|
|
serial = self._extract_field(detail_output, "Serial Number")
|
|
temp = self._extract_field(detail_output, "Current Temperature")
|
|
|
|
drive = PhysicalDrive(
|
|
slot=slot,
|
|
status=status,
|
|
size=size,
|
|
interface=interface,
|
|
model=model or "Unknown",
|
|
serial=serial or "N/A",
|
|
temperature=temp
|
|
)
|
|
drives.append(drive)
|
|
|
|
return drives
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen der physischen Laufwerke: {e}")
|
|
return []
|
|
|
|
def get_unassigned_drives(self, controller_slot: str):
|
|
try:
|
|
output = self.run_command(f"ctrl slot={controller_slot} pd allunassigned show")
|
|
drives = []
|
|
|
|
# Parse physische Laufwerke
|
|
for line in output.split('\n'):
|
|
if 'physicaldrive' in line.lower():
|
|
# Beispiel: "physicaldrive 1I:1:1 (port 1I:box 1:bay 1, SAS, 300 GB, OK)"
|
|
match = re.search(r'physicaldrive (\S+).*?\(([^,]+),\s*([^,]+),\s*([^,]+),\s*([^)]+)', line)
|
|
if match:
|
|
slot = match.group(1)
|
|
interface = match.group(3).strip()
|
|
size = match.group(4).strip()
|
|
status = match.group(5).strip()
|
|
|
|
# Detaillierte Laufwerk-Info
|
|
detail_output = self.run_command(f"ctrl slot={controller_slot} pd {slot} show")
|
|
model = self._extract_field(detail_output, "Model")
|
|
serial = self._extract_field(detail_output, "Serial Number")
|
|
temp = self._extract_field(detail_output, "Current Temperature")
|
|
|
|
drive = PhysicalDrive(
|
|
slot=slot,
|
|
status=status,
|
|
size=size,
|
|
interface=interface,
|
|
model=model or "Unknown",
|
|
serial=serial or "N/A",
|
|
temperature=temp
|
|
)
|
|
drives.append(drive)
|
|
|
|
return drives
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen der unassigned Laufwerke: {e}")
|
|
return []
|
|
|
|
def get_logical_drives(self, controller_slot: str) -> List[LogicalDrive]:
|
|
"""
|
|
Ruft alle logischen Laufwerke für einen Controller ab
|
|
|
|
Args:
|
|
controller_slot: Controller Slot-Nummer
|
|
|
|
Returns:
|
|
Liste der logischen Laufwerke
|
|
"""
|
|
try:
|
|
output = self.run_command(f"ctrl slot={controller_slot} ld all show")
|
|
drives = []
|
|
|
|
# Parse logische Laufwerke
|
|
for line in output.split('\n'):
|
|
if 'logicaldrive' in line.lower():
|
|
# Beispiel: "logicaldrive 1 (279.4 GB, RAID 1, OK)"
|
|
match = re.search(r'logicaldrive (\d+) \(([^,]+),\s*([^,]+),\s*([^)]+)', line)
|
|
if match:
|
|
drive_id = match.group(1)
|
|
size = match.group(2).strip()
|
|
raid_level = match.group(3).strip()
|
|
status = match.group(4).strip()
|
|
|
|
drive = LogicalDrive(
|
|
drive_id=drive_id,
|
|
size=size,
|
|
raid_level=raid_level,
|
|
status=status
|
|
)
|
|
drives.append(drive)
|
|
|
|
return drives
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen der logischen Laufwerke: {e}")
|
|
return []
|
|
|
|
def get_arrays(self, controller_slot: str) -> List[Array]:
|
|
"""
|
|
Ruft alle Arrays für einen Controller ab
|
|
|
|
Args:
|
|
controller_slot: Controller Slot-Nummer
|
|
|
|
Returns:
|
|
Liste der Arrays mit ungenutztem Speicher
|
|
"""
|
|
try:
|
|
output = self.run_command(f"ctrl slot={controller_slot} array all show")
|
|
arrays = []
|
|
|
|
# Parse Arrays
|
|
current_array = None
|
|
for line in output.split('\n'):
|
|
line = line.strip()
|
|
|
|
# Array Header erkennen
|
|
if line.startswith('array ') and '(' in line:
|
|
# Beispiel: "array A (SAS, Unused Space: 558.9 GB)"
|
|
match = re.search(r'array ([A-Z]+) \(([^,]+)(?:,\s*Unused Space:\s*([^)]+))?\)', line)
|
|
if match:
|
|
array_id = match.group(1)
|
|
interface = match.group(2).strip()
|
|
unused_space = match.group(3) if match.group(3) else "0 GB"
|
|
|
|
# Detaillierte Array-Info abrufen
|
|
try:
|
|
detail_output = self.run_command(f"ctrl slot={controller_slot} array {array_id} show")
|
|
|
|
# Array-Typ und Status extrahieren
|
|
array_type = self._extract_field(detail_output, "Array Type") or "Unknown"
|
|
status = self._extract_field(detail_output, "Status") or "Unknown"
|
|
|
|
# Array-Größe extrahieren
|
|
size_match = re.search(r'Array Size:\s*([^,\n]+)', detail_output)
|
|
size = size_match.group(1).strip() if size_match else "Unknown"
|
|
|
|
# Physische Laufwerke in diesem Array zählen
|
|
drive_count = len(re.findall(r'physicaldrive \d+[A-Z]:\d+:\d+', detail_output))
|
|
|
|
# Laufwerk-Slots extrahieren
|
|
drive_slots = re.findall(r'physicaldrive (\d+[A-Z]:\d+:\d+)', detail_output)
|
|
|
|
current_array = Array(
|
|
array_id=array_id,
|
|
array_type=array_type,
|
|
interface=interface,
|
|
size=size,
|
|
unused_space=unused_space,
|
|
status=status,
|
|
drive_count=drive_count,
|
|
drives=drive_slots
|
|
)
|
|
arrays.append(current_array)
|
|
|
|
except Exception as detail_error:
|
|
print(f"Warnung: Konnte Details für Array {array_id} nicht abrufen: {detail_error}")
|
|
# Fallback mit verfügbaren Informationen
|
|
current_array = Array(
|
|
array_id=array_id,
|
|
array_type="Unknown",
|
|
interface=interface,
|
|
size="Unknown",
|
|
unused_space=unused_space,
|
|
status="Unknown",
|
|
drive_count=0,
|
|
drives=[]
|
|
)
|
|
arrays.append(current_array)
|
|
|
|
return arrays
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen der Arrays: {e}")
|
|
return []
|
|
|
|
def calculate_array_unused_space(self, controller_slot: str) -> Dict[str, Any]:
|
|
"""
|
|
Berechnet ungenutzten Speicherplatz in bestehenden Arrays
|
|
|
|
Args:
|
|
controller_slot: Controller Slot-Nummer
|
|
|
|
Returns:
|
|
Dictionary mit ungenutztem Array-Speicher und möglichen LD-Konfigurationen
|
|
"""
|
|
try:
|
|
arrays = self.get_arrays(controller_slot)
|
|
|
|
total_unused_gb = 0.0
|
|
array_details = []
|
|
possible_logical_drives = {}
|
|
|
|
for array in arrays:
|
|
unused_space_gb = 0.0
|
|
|
|
# Parse unused space
|
|
if array.unused_space and array.unused_space != "0 GB":
|
|
unused_match = re.search(r'(\d+(?:\.\d+)?)\s*(GB|TB)', array.unused_space, re.IGNORECASE)
|
|
if unused_match:
|
|
unused_value = float(unused_match.group(1))
|
|
unit = unused_match.group(2).upper()
|
|
if unit == "TB":
|
|
unused_value *= 1024
|
|
unused_space_gb = unused_value
|
|
total_unused_gb += unused_space_gb
|
|
|
|
array_info = {
|
|
"array_id": array.array_id,
|
|
"array_type": array.array_type,
|
|
"interface": array.interface,
|
|
"total_size": array.size,
|
|
"unused_space": array.unused_space,
|
|
"unused_space_gb": unused_space_gb,
|
|
"status": array.status,
|
|
"drive_count": array.drive_count,
|
|
"drives": array.drives
|
|
}
|
|
array_details.append(array_info)
|
|
|
|
# Mögliche logische Laufwerke für dieses Array
|
|
if unused_space_gb > 0:
|
|
array_key = f"Array {array.array_id} ({array.interface})"
|
|
possible_logical_drives[array_key] = {
|
|
"available_space_gb": unused_space_gb,
|
|
"available_space": f"{unused_space_gb:.1f} GB",
|
|
"suggested_sizes": []
|
|
}
|
|
|
|
# Vorschläge für logische Laufwerk-Größen
|
|
if unused_space_gb >= 1000: # 1TB+
|
|
possible_logical_drives[array_key]["suggested_sizes"].extend([
|
|
f"{unused_space_gb:.0f} GB (kompletter verfügbarer Platz)",
|
|
"500 GB",
|
|
"100 GB"
|
|
])
|
|
elif unused_space_gb >= 100: # 100GB+
|
|
possible_logical_drives[array_key]["suggested_sizes"].extend([
|
|
f"{unused_space_gb:.0f} GB (kompletter verfügbarer Platz)",
|
|
"50 GB",
|
|
"20 GB"
|
|
])
|
|
else: # Weniger als 100GB
|
|
possible_logical_drives[array_key]["suggested_sizes"].append(
|
|
f"{unused_space_gb:.1f} GB (kompletter verfügbarer Platz)"
|
|
)
|
|
|
|
return {
|
|
"total_unused_space_gb": total_unused_gb,
|
|
"total_unused_space": f"{total_unused_gb:.1f} GB",
|
|
"arrays_with_unused_space": len([a for a in array_details if a["unused_space_gb"] > 0]),
|
|
"array_details": array_details,
|
|
"possible_logical_drives": possible_logical_drives
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Berechnen des ungenutzten Array-Speichers: {e}")
|
|
return {
|
|
"total_unused_space_gb": 0.0,
|
|
"total_unused_space": "0 GB",
|
|
"arrays_with_unused_space": 0,
|
|
"array_details": [],
|
|
"possible_logical_drives": {}
|
|
}
|
|
"""
|
|
Ruft alle nicht zugewiesenen physischen Laufwerke ab
|
|
|
|
Args:
|
|
controller_slot: Controller Slot-Nummer
|
|
|
|
Returns:
|
|
Liste der nicht zugewiesenen Laufwerke
|
|
"""
|
|
try:
|
|
output = self.run_command(f"ctrl slot={controller_slot} pd all show")
|
|
unassigned_drives = []
|
|
|
|
# Parse nicht zugewiesene Laufwerke
|
|
for line in output.split('\n'):
|
|
if 'physicaldrive' in line.lower() and ('unassigned' in line.lower() or 'ready' in line.lower()):
|
|
# Beispiel: "physicaldrive 1I:1:2 (port 1I:box 1:bay 2, SAS, 300 GB, OK, unassigned)"
|
|
match = re.search(r'physicaldrive (\S+).*?\(([^,]+),\s*([^,]+),\s*([^,]+),\s*([^,)]+)', line)
|
|
if match:
|
|
slot = match.group(1)
|
|
interface = match.group(3).strip()
|
|
size = match.group(4).strip()
|
|
|
|
# Detaillierte Info für nicht zugewiesene Laufwerke
|
|
detail_output = self.run_command(f"ctrl slot={controller_slot} pd {slot} show")
|
|
model = self._extract_field(detail_output, "Model")
|
|
serial = self._extract_field(detail_output, "Serial Number")
|
|
|
|
# Prüfen ob wirklich nicht zugewiesen
|
|
if "unassigned" in detail_output.lower() or "ready" in detail_output.lower():
|
|
drive = UnassignedDrive(
|
|
slot=slot,
|
|
size=size,
|
|
interface=interface,
|
|
model=model or "Unknown",
|
|
serial=serial or "N/A"
|
|
)
|
|
unassigned_drives.append(drive)
|
|
|
|
return unassigned_drives
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen der nicht zugewiesenen Laufwerke: {e}")
|
|
return []
|
|
|
|
def calculate_available_space(self, controller_slot: str) -> Dict[str, Any]:
|
|
"""
|
|
Berechnet verfügbaren Speicherplatz für neue logische Laufwerke
|
|
|
|
Args:
|
|
controller_slot: Controller Slot-Nummer
|
|
|
|
Returns:
|
|
Dictionary mit verfügbarem Speicher und möglichen RAID-Konfigurationen
|
|
"""
|
|
try:
|
|
unassigned_drives = self.get_unassigned_drives(controller_slot)
|
|
|
|
if not unassigned_drives:
|
|
return {
|
|
"total_unassigned_drives": 0,
|
|
"available_space": "0 GB",
|
|
"possible_raids": {},
|
|
"drives": []
|
|
}
|
|
|
|
# Gruppiere Laufwerke nach Größe und Interface
|
|
drive_groups = {}
|
|
total_space_gb = 0
|
|
|
|
for drive in unassigned_drives:
|
|
# Extrahiere Größe in GB
|
|
size_match = re.search(r'(\d+(?:\.\d+)?)\s*(GB|TB)', drive.size, re.IGNORECASE)
|
|
if size_match:
|
|
size_value = float(size_match.group(1))
|
|
unit = size_match.group(2).upper()
|
|
if unit == "TB":
|
|
size_value *= 1024
|
|
|
|
key = f"{drive.size}_{drive.interface}"
|
|
if key not in drive_groups:
|
|
drive_groups[key] = {
|
|
"size": drive.size,
|
|
"size_gb": size_value,
|
|
"interface": drive.interface,
|
|
"count": 0,
|
|
"drives": []
|
|
}
|
|
|
|
drive_groups[key]["count"] += 1
|
|
drive_groups[key]["drives"].append(drive.slot)
|
|
total_space_gb += size_value
|
|
|
|
# Berechne mögliche RAID-Konfigurationen
|
|
possible_raids = {}
|
|
|
|
for group_key, group in drive_groups.items():
|
|
count = group["count"]
|
|
size_gb = group["size_gb"]
|
|
|
|
group_raids = {}
|
|
|
|
# RAID 0: Mindestens 2 Laufwerke
|
|
if count >= 2:
|
|
raid0_capacity = count * size_gb
|
|
group_raids["RAID 0"] = {
|
|
"drives_needed": count,
|
|
"capacity_gb": raid0_capacity,
|
|
"description": f"Alle {count} Laufwerke, {raid0_capacity:.1f} GB nutzbar"
|
|
}
|
|
|
|
# RAID 1: Genau 2 Laufwerke
|
|
if count >= 2:
|
|
pairs = count // 2
|
|
raid1_capacity = pairs * size_gb
|
|
group_raids["RAID 1"] = {
|
|
"drives_needed": pairs * 2,
|
|
"capacity_gb": raid1_capacity,
|
|
"description": f"{pairs} Paar(e), {raid1_capacity:.1f} GB nutzbar"
|
|
}
|
|
|
|
# RAID 5: Mindestens 3 Laufwerke
|
|
if count >= 3:
|
|
raid5_capacity = (count - 1) * size_gb
|
|
group_raids["RAID 5"] = {
|
|
"drives_needed": count,
|
|
"capacity_gb": raid5_capacity,
|
|
"description": f"{count} Laufwerke, {raid5_capacity:.1f} GB nutzbar"
|
|
}
|
|
|
|
# RAID 6: Mindestens 4 Laufwerke
|
|
if count >= 4:
|
|
raid6_capacity = (count - 2) * size_gb
|
|
group_raids["RAID 6"] = {
|
|
"drives_needed": count,
|
|
"capacity_gb": raid6_capacity,
|
|
"description": f"{count} Laufwerke, {raid6_capacity:.1f} GB nutzbar"
|
|
}
|
|
|
|
# RAID 10: Mindestens 4 Laufwerke, gerade Anzahl
|
|
if count >= 4 and count % 2 == 0:
|
|
raid10_capacity = (count // 2) * size_gb
|
|
group_raids["RAID 10"] = {
|
|
"drives_needed": count,
|
|
"capacity_gb": raid10_capacity,
|
|
"description": f"{count} Laufwerke, {raid10_capacity:.1f} GB nutzbar"
|
|
}
|
|
|
|
if group_raids:
|
|
possible_raids[f"{group['size']} {group['interface']} ({count}x)"] = group_raids
|
|
|
|
return {
|
|
"total_unassigned_drives": len(unassigned_drives),
|
|
"available_space": f"{total_space_gb:.1f} GB",
|
|
"possible_raids": possible_raids,
|
|
"drives": [
|
|
{
|
|
"slot": drive.slot,
|
|
"size": drive.size,
|
|
"interface": drive.interface,
|
|
"model": drive.model,
|
|
"serial": drive.serial
|
|
}
|
|
for drive in unassigned_drives
|
|
],
|
|
"drive_groups": drive_groups
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Berechnen des verfügbaren Speichers: {e}")
|
|
return {
|
|
"total_unassigned_drives": 0,
|
|
"available_space": "0 GB",
|
|
"possible_raids": {},
|
|
"drives": []
|
|
}
|
|
"""Hilfsfunktion zum Extrahieren von Feldern aus SSACLI-Ausgabe"""
|
|
pattern = f"{field_name}\\s*:\\s*(.+)"
|
|
match = re.search(pattern, output, re.IGNORECASE)
|
|
return match.group(1).strip() if match else None
|
|
|
|
def create_raid_array(self, controller_slot: str, raid_level: str, drives: List[str]) -> bool:
|
|
"""
|
|
Erstellt ein neues RAID-Array
|
|
|
|
Args:
|
|
controller_slot: Controller Slot-Nummer
|
|
raid_level: RAID-Level (0, 1, 5, 6, 10)
|
|
drives: Liste der physischen Laufwerk-Slots
|
|
|
|
Returns:
|
|
True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
drives_str = ",".join(drives)
|
|
command = f"ctrl slot={controller_slot} create type=ld drives={drives_str} raid={raid_level}"
|
|
output = self.run_command(command)
|
|
return "Success" in output or "OK" in output
|
|
except Exception as e:
|
|
print(f"Fehler beim Erstellen des RAID-Arrays: {e}")
|
|
return False
|
|
|
|
def generate_report(self) -> Dict[str, Any]:
|
|
"""
|
|
Generiert einen umfassenden Bericht über alle Controller und Laufwerke
|
|
|
|
Returns:
|
|
Dictionary mit allen System-Informationen
|
|
"""
|
|
report = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"controllers": []
|
|
}
|
|
|
|
controllers = self.discover_controllers()
|
|
|
|
for controller in controllers:
|
|
controller_data = {
|
|
"controller": {
|
|
"slot": controller.slot,
|
|
"model": controller.model,
|
|
"serial": controller.serial,
|
|
"cache_status": controller.cache_status,
|
|
"battery_status": controller.battery_status
|
|
},
|
|
"physical_drives": [],
|
|
"logical_drives": []
|
|
}
|
|
|
|
# Physische Laufwerke
|
|
physical_drives = self.get_physical_drives(controller.slot)
|
|
for pd in physical_drives:
|
|
controller_data["physical_drives"].append({
|
|
"slot": pd.slot,
|
|
"status": pd.status,
|
|
"size": pd.size,
|
|
"interface": pd.interface,
|
|
"model": pd.model,
|
|
"serial": pd.serial,
|
|
"temperature": pd.temperature
|
|
})
|
|
|
|
# Logische Laufwerke
|
|
logical_drives = self.get_logical_drives(controller.slot)
|
|
for ld in logical_drives:
|
|
controller_data["logical_drives"].append({
|
|
"drive_id": ld.drive_id,
|
|
"size": ld.size,
|
|
"raid_level": ld.raid_level,
|
|
"status": ld.status
|
|
})
|
|
|
|
# Verfügbarer Speicher und mögliche RAIDs
|
|
available_space_info = self.calculate_available_space(controller.slot)
|
|
controller_data["available_space"] = available_space_info
|
|
|
|
# Ungenutzter Array-Speicher
|
|
array_unused_info = self.calculate_array_unused_space(controller.slot)
|
|
controller_data["array_unused_space"] = array_unused_info
|
|
|
|
report["controllers"].append(controller_data)
|
|
|
|
return report
|
|
|
|
def print_summary(self, output_format: OutputFormat = OutputFormat.TEXT):
|
|
"""
|
|
Gibt eine Zusammenfassung aller Systeminformationen aus
|
|
|
|
Args:
|
|
output_format: Gewünschtes Ausgabeformat (TEXT, JSON, YAML)
|
|
"""
|
|
if output_format == OutputFormat.TEXT:
|
|
self._print_text_summary()
|
|
elif output_format == OutputFormat.JSON:
|
|
self._print_json_summary()
|
|
elif output_format == OutputFormat.YAML:
|
|
self._print_yaml_summary()
|
|
else:
|
|
raise ValueError(f"Unbekanntes Ausgabeformat: {output_format}")
|
|
|
|
def _extract_field(self, detail_output, field):
|
|
"""
|
|
serial = self._extract_field(detail_output, "Serial Number")
|
|
|
|
Serial Number: PZXND0BRHH68OI
|
|
"""
|
|
try:
|
|
for line in iter(detail_output.splitlines()):
|
|
line = line.strip()
|
|
line = line.split(':')
|
|
if line[0] == field:
|
|
return(line[1])
|
|
except:
|
|
return('')
|
|
|
|
def _print_text_summary(self):
|
|
"""Druckt eine übersichtliche Text-Zusammenfassung für die Kommandozeile"""
|
|
print("=" * 80)
|
|
print("HP Smart Array Controller Zusammenfassung")
|
|
print("=" * 80)
|
|
|
|
controllers = self.discover_controllers()
|
|
|
|
if not controllers:
|
|
print("Keine Smart Array Controller gefunden!")
|
|
return
|
|
|
|
for controller in controllers:
|
|
print(f"\n📊 Controller Slot {controller.slot}: {controller.model}")
|
|
print(f" Serial: {controller.serial}")
|
|
print(f" Cache Status: {controller.cache_status}")
|
|
if controller.battery_status:
|
|
print(f" Battery Status: {controller.battery_status}")
|
|
|
|
# Physische Laufwerke
|
|
physical_drives = self.get_physical_drives(controller.slot)
|
|
print(f"\n💾 Physische Laufwerke ({len(physical_drives)}):")
|
|
for pd in physical_drives:
|
|
status_emoji = "✅" if pd.status == "OK" else "❌"
|
|
temp_info = f" ({pd.temperature})" if pd.temperature else ""
|
|
print(f" {status_emoji} {pd.slot}: {pd.size} {pd.interface} - {pd.status}{temp_info}")
|
|
|
|
# Logische Laufwerke
|
|
logical_drives = self.get_logical_drives(controller.slot)
|
|
print(f"\n🔗 Logische Laufwerke ({len(logical_drives)}):")
|
|
for ld in logical_drives:
|
|
status_emoji = "✅" if ld.status == "OK" else "❌"
|
|
print(f" {status_emoji} LD {ld.drive_id}: {ld.size} {ld.raid_level} - {ld.status}")
|
|
|
|
# Verfügbarer Speicher
|
|
available_info = self.calculate_available_space(controller.slot)
|
|
unassigned_count = available_info["total_unassigned_drives"]
|
|
|
|
if unassigned_count > 0:
|
|
print(f"\n💾 Nicht zugewiesener Speicher:")
|
|
print(f" 📦 {unassigned_count} Laufwerk(e) verfügbar ({available_info['available_space']})")
|
|
|
|
# Zeige verfügbare Laufwerke
|
|
for drive in available_info["drives"]:
|
|
print(f" 🔸 {drive['slot']}: {drive['size']} {drive['interface']} - {drive['model']}")
|
|
|
|
# Zeige mögliche RAID-Konfigurationen
|
|
if available_info["possible_raids"]:
|
|
print(f"\n🛠️ Mögliche RAID-Konfigurationen:")
|
|
for group_name, raids in available_info["possible_raids"].items():
|
|
print(f" 📋 {group_name}:")
|
|
for raid_type, raid_info in raids.items():
|
|
print(f" • {raid_type}: {raid_info['description']}")
|
|
|
|
# Ungenutzter Array-Speicher
|
|
array_unused_info = self.calculate_array_unused_space(controller.slot)
|
|
total_unused = array_unused_info["total_unused_space_gb"]
|
|
arrays_with_unused = array_unused_info["arrays_with_unused_space"]
|
|
|
|
if total_unused > 0:
|
|
print(f"\n🔧 Ungenutzter Array-Speicher:")
|
|
print(f" 📊 {arrays_with_unused} Array(s) mit ungenutztem Platz ({array_unused_info['total_unused_space']})")
|
|
|
|
for array_info in array_unused_info["array_details"]:
|
|
if array_info["unused_space_gb"] > 0:
|
|
print(f" 🔸 Array {array_info['array_id']}: {array_info['unused_space']} verfügbar")
|
|
print(f" └─ {array_info['interface']}, {array_info['drive_count']} Laufwerke, Status: {array_info['status']}")
|
|
|
|
# Mögliche logische Laufwerke
|
|
if array_unused_info["possible_logical_drives"]:
|
|
print(f"\n💿 Mögliche neue logische Laufwerke:")
|
|
for array_name, ld_info in array_unused_info["possible_logical_drives"].items():
|
|
print(f" 📋 {array_name} ({ld_info['available_space']} verfügbar):")
|
|
for size in ld_info["suggested_sizes"]:
|
|
print(f" • {size}")
|
|
|
|
if unassigned_count == 0 and total_unused == 0:
|
|
print(f"\n💾 Speicher-Status: Alle verfügbaren Laufwerke sind vollständig genutzt ✅")
|
|
|
|
def _print_json_summary(self):
|
|
"""Gibt eine JSON-formatierte Zusammenfassung aus"""
|
|
report = self.generate_report()
|
|
print(json.dumps(report, indent=2, ensure_ascii=False))
|
|
|
|
def _print_yaml_summary(self):
|
|
"""Gibt eine YAML-formatierte Zusammenfassung aus"""
|
|
report = self.generate_report()
|
|
print(yaml.dump(report, default_flow_style=False, allow_unicode=True, sort_keys=False))
|
|
|
|
def export_report(self, filename: str, output_format: OutputFormat):
|
|
"""
|
|
Exportiert einen detaillierten Bericht in eine Datei
|
|
|
|
Args:
|
|
filename: Zieldateiname
|
|
output_format: Gewünschtes Ausgabeformat
|
|
"""
|
|
report = self.generate_report()
|
|
|
|
try:
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
if output_format == OutputFormat.JSON:
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
elif output_format == OutputFormat.YAML:
|
|
yaml.dump(report, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
elif output_format == OutputFormat.TEXT:
|
|
# Text-Export als formatierte Ausgabe
|
|
f.write("=" * 80 + "\n")
|
|
f.write("HP Smart Array Controller Bericht\n")
|
|
f.write(f"Generiert: {report['timestamp']}\n")
|
|
f.write("=" * 80 + "\n\n")
|
|
|
|
for controller_data in report["controllers"]:
|
|
controller = controller_data["controller"]
|
|
f.write(f"Controller Slot {controller['slot']}: {controller['model']}\n")
|
|
f.write(f" Serial: {controller['serial']}\n")
|
|
f.write(f" Cache Status: {controller['cache_status']}\n")
|
|
if controller.get('battery_status'):
|
|
f.write(f" Battery Status: {controller['battery_status']}\n")
|
|
|
|
f.write(f"\n Physische Laufwerke ({len(controller_data['physical_drives'])}):\n")
|
|
for pd in controller_data["physical_drives"]:
|
|
f.write(f" {pd['slot']}: {pd['size']} {pd['interface']} - {pd['status']}\n")
|
|
|
|
f.write(f"\n Logische Laufwerke ({len(controller_data['logical_drives'])}):\n")
|
|
for ld in controller_data["logical_drives"]:
|
|
f.write(f" LD {ld['drive_id']}: {ld['size']} {ld['raid_level']} - {ld['status']}\n")
|
|
|
|
# Verfügbarer Speicher
|
|
available = controller_data.get("available_space", {})
|
|
if available.get("total_unassigned_drives", 0) > 0:
|
|
f.write(f"\n Nicht zugewiesener Speicher: {available['available_space']}\n")
|
|
for drive in available.get("drives", []):
|
|
f.write(f" {drive['slot']}: {drive['size']} {drive['interface']}\n")
|
|
|
|
# Array-Speicher
|
|
array_unused = controller_data.get("array_unused_space", {})
|
|
if array_unused.get("total_unused_space_gb", 0) > 0:
|
|
f.write(f"\n Ungenutzter Array-Speicher: {array_unused['total_unused_space']}\n")
|
|
for array_info in array_unused.get("array_details", []):
|
|
if array_info.get("unused_space_gb", 0) > 0:
|
|
f.write(f" Array {array_info['array_id']}: {array_info['unused_space']}\n")
|
|
|
|
f.write("\n" + "-" * 60 + "\n")
|
|
|
|
print(f"✅ Bericht exportiert: {filename}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Fehler beim Exportieren: {e}")
|
|
|
|
|
|
def main():
|
|
"""Hauptfunktion mit Kommandozeilen-Interface"""
|
|
parser = argparse.ArgumentParser(
|
|
description="SSACLI Python Automation Tool - Verwaltung von HP Smart Array Controllern",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Beispiele:
|
|
python3 ssacli_tool.py # Standard Text-Ausgabe
|
|
python3 ssacli_tool.py --format json # JSON-Ausgabe
|
|
python3 ssacli_tool.py --format yaml # YAML-Ausgabe
|
|
python3 ssacli_tool.py --export report.json # JSON-Export in Datei
|
|
python3 ssacli_tool.py --export report.yaml --format yaml # YAML-Export
|
|
python3 ssacli_tool.py --ssacli-path /usr/sbin/ssacli # Alternativer SSACLI-Pfad
|
|
|
|
Hinweis: Das Script muss als Administrator/Root ausgeführt werden.
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--format', '-f',
|
|
type=str,
|
|
choices=['text', 'json', 'yaml'],
|
|
default='text',
|
|
help='Ausgabeformat (Standard: text)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--export', '-e',
|
|
type=str,
|
|
help='Exportiere Bericht in angegebene Datei'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--ssacli-path',
|
|
type=str,
|
|
default='ssacli',
|
|
help='Pfad zur SSACLI-Executable (Standard: ssacli)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--quiet', '-q',
|
|
action='store_true',
|
|
help='Unterdrücke Statusmeldungen (nur Ausgabe/Export)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--version', '-v',
|
|
action='version',
|
|
version='SSACLI Python Tool v2.0'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Ausgabeformat bestimmen
|
|
try:
|
|
output_format = OutputFormat(args.format)
|
|
except ValueError:
|
|
print(f"❌ Unbekanntes Format: {args.format}")
|
|
return 1
|
|
|
|
if not args.quiet:
|
|
print("🚀 SSACLI Python Automation Tool v2.0")
|
|
if args.format != 'text':
|
|
print(f"📋 Ausgabeformat: {args.format.upper()}")
|
|
print("-" * 50)
|
|
|
|
# SSACLI Manager initialisieren
|
|
ssacli = SSACLIManager(args.ssacli_path)
|
|
|
|
try:
|
|
# Export in Datei
|
|
if args.export:
|
|
if not args.quiet:
|
|
print(f"📤 Exportiere Bericht nach {args.export}...")
|
|
|
|
ssacli.export_report(args.export, output_format)
|
|
|
|
if not args.quiet:
|
|
print("✅ Export erfolgreich abgeschlossen")
|
|
|
|
# Ausgabe auf Konsole
|
|
else:
|
|
if not args.quiet and args.format == 'text':
|
|
print()
|
|
|
|
ssacli.print_summary(output_format)
|
|
|
|
return 0
|
|
|
|
except KeyboardInterrupt:
|
|
if not args.quiet:
|
|
print("\n⚠️ Abgebrochen durch Benutzer")
|
|
return 130
|
|
|
|
except Exception as e:
|
|
print(f"❌ Fehler: {e}")
|
|
if not args.quiet:
|
|
print("\nHinweise:")
|
|
print("- Stellen Sie sicher, dass SSACLI installiert ist")
|
|
print("- Script als Administrator/Root ausführen")
|
|
print("- Smart Array Controller müssen vorhanden sein")
|
|
print(f"- SSACLI-Pfad prüfen: {args.ssacli_path}")
|
|
return 1
|
|
|
|
|
|
# Legacy-Funktion für Rückwärtskompatibilität
|
|
def legacy_main():
|
|
"""Legacy Hauptfunktion für alte Verwendung"""
|
|
print("🚀 SSACLI Python Automation Tool")
|
|
print("-" * 40)
|
|
|
|
# SSACLI Manager initialisieren
|
|
ssacli = SSACLIManager()
|
|
|
|
try:
|
|
# Übersicht anzeigen
|
|
ssacli.print_summary()
|
|
|
|
# Detaillierten JSON-Report generieren
|
|
print("\n📋 Generiere detaillierten Report...")
|
|
report = ssacli.generate_report()
|
|
|
|
# Report in Datei speichern
|
|
report_filename = f"ssacli_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
with open(report_filename, 'w', encoding='utf-8') as f:
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"✅ Report gespeichert: {report_filename}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Fehler: {e}")
|
|
print("\nHinweis: Stellen Sie sicher, dass:")
|
|
print("- SSACLI installiert ist")
|
|
print("- Das Script als Administrator/Root ausgeführt wird")
|
|
print("- Smart Array Controller vorhanden sind")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |