Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tensilelite/Tensile/Common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
# common
########################################
globalParameters["MinimumRequiredVersion"] = "0.0.0" # which version of tensile is required to handle all the features required by this configuration file
globalParameters["DeviceList"] = [] # list of GPU devices to use for tuning, empty=use default, [-1]=use all available
globalParameters["PerformanceMetric"] = "DeviceEfficiency" # performance metric for benchmarking; one of {DeviceEfficiency, CUEfficiency}
globalParameters["PrintLevel"] = 1 # how much info to print in generator. 0=none, 1=standard, 2=verbose
globalParameters["PrintTiming"] = False # print duration for each stage in generator.
Expand Down
374 changes: 374 additions & 0 deletions tensilelite/Tensile/bin/TensileParallel
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
#!/usr/bin/env python3

import yaml
import subprocess
import os
import sys
import time
import ctypes
from ctypes import c_int, byref, CDLL
from multiprocessing import Pool
import numpy as np
import logging
from typing import List, Dict, Tuple, Any, Optional
import signal
import json
from dataclasses import dataclass
from contextlib import contextmanager

# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('tensile_parallel.log')
]
)
logger = logging.getLogger(__name__)

@dataclass
class DeviceInfo:
"""Store device information and workload state"""
id: int
workload: float = 0.0
sizes: List[Any] = None

def __post_init__(self):
if self.sizes is None:
self.sizes = []

class DeviceError(Exception):
"""Custom exception for device-related errors"""
pass

def get_available_devices() -> List[int]:
"""Get available GPU devices with comprehensive error handling"""
try:
count = c_int()
hip_lib = CDLL("libamdhip64.so")
result = hip_lib.hipGetDeviceCount(byref(count))

if result == 0: # hipSuccess
# Verify each device is actually accessible
available_devices = []
for device_id in range(count.value):
if hip_lib.hipSetDevice(device_id) == 0:
available_devices.append(device_id)
return available_devices
elif result == 1: # hipErrorNoDevice
logger.warning("No GPU devices found")
return []
else:
raise DeviceError(f"hipGetDeviceCount failed with error code {result}")
except Exception as e:
logger.error(f"Error accessing GPU devices: {str(e)}")
raise

def estimate_complexity(size: Any) -> float:
"""Calculate workload complexity with improved accuracy"""
if isinstance(size, dict):
if 'Exact' in size:
return float(np.prod(size['Exact']))
elif 'Range' in size:
range_vals = size['Range']
return float(np.prod([(end - start) / step for start, end, step in range_vals]))
return 1.0

class TensileParallel:
def __init__(self, input_file: str, output_dir: str):
self.input_file = input_file
self.output_dir = output_dir
self.config = self._load_config()
self.devices = self._initialize_devices()

def _load_config(self) -> dict:
"""Load and validate YAML configuration"""
try:
with open(self.input_file, 'r') as f:
config = yaml.safe_load(f)

if not config.get('BenchmarkProblems'):
raise ValueError("Missing BenchmarkProblems in configuration")

return config
except Exception as e:
logger.error(f"Failed to load configuration: {str(e)}")
raise

def _should_use_standard_tensile(self) -> bool:
"""
Determine whether to use standard Tensile execution based on device configuration.

Returns:
bool: True if should use standard Tensile, False if should use parallel execution

Logic:
1. If DeviceList is empty/not specified -> use standard
2. If DeviceList contains -1 -> use all available devices
3. If DeviceList contains invalid entries (non-numeric, negative except -1) -> use standard
4. If specified devices exist -> use those devices
5. If no specified devices are available -> use standard
"""
device_list = self.config.get('GlobalParameters', {}).get('DeviceList', [])

# Case 1: No devices specified
if not device_list:
logger.info("No devices specified in DeviceList, falling back to standard Tensile")
return True

try:
# Convert all entries to integers
device_list = [int(dev) for dev in device_list]

# Case 2: Use all available devices if -1 is specified
if -1 in device_list:
available_devices = get_available_devices()
if not available_devices:
logger.warning("No GPU devices available despite -1 in DeviceList")
return True
logger.info(f"Found {len(available_devices)} available devices due to -1 in DeviceList")
return False

# Case 3: Check for invalid entries (negative numbers except -1)
if any(dev < 0 for dev in device_list):
logger.warning("DeviceList contains invalid negative values")
return True

# Case 4 & 5: Validate specified devices
available_devices = get_available_devices()
valid_devices = [dev for dev in device_list if dev in available_devices]

if not valid_devices:
logger.warning("None of the specified devices in DeviceList are available")
return True

logger.info(f"Using specified devices: {valid_devices}")
return False

except (ValueError, TypeError):
# Handle non-numeric entries
logger.warning("DeviceList contains non-numeric values")
return True

def _initialize_devices(self) -> List[DeviceInfo]:
"""Initialize device information with optimal device selection"""
if self._should_use_standard_tensile():
logger.info("DeviceList contains -1, falling back to standard Tensile execution")
return []

available_devices = get_available_devices()
if not available_devices:
raise DeviceError("No available GPU devices found")

configured_devices = self.config.get('GlobalParameters', {}).get('DeviceList', [])
if configured_devices:
device_list = [dev for dev in configured_devices if dev in available_devices]
if not device_list:
logger.warning("No specified devices are available. Using all available devices.")
device_list = available_devices
else:
device_list = available_devices

logger.info(f"Using devices: {device_list}")
return [DeviceInfo(id=device_id) for device_id in device_list]

def _distribute_workload(self) -> None:
"""Distribute problem sizes using simple greedy approach for load balancing"""
# Get problem sizes from config
problem_sizes = self.config['BenchmarkProblems'][0][1]['BenchmarkFinalParameters'][0]['ProblemSizes']

# Calculate complexity for each problem size and sort by complexity (largest first)
workloads = [(size, estimate_complexity(size)) for size in problem_sizes]
workloads.sort(key=lambda x: x[1], reverse=True)

logger.info(f"Total number of problems to distribute: {len(workloads)}")

# Distribute each problem to the device with the lowest current workload
for size, complexity in workloads:
# Find device with minimum current workload
target_device = min(self.devices, key=lambda d: d.workload)
target_device.sizes.append(size)
target_device.workload += complexity

logger.debug(f"Assigned problem {size} (complexity: {complexity:.2e}) to device {target_device.id}")

# Log final distribution summary
for device in self.devices:
logger.info(f"Device {device.id}: {len(device.sizes)} problems, "
f"total workload: {device.workload:.2e}")

def _generate_device_configs(self) -> None:
"""Generate device-specific configurations"""
os.makedirs(self.output_dir, exist_ok=True)

for device in self.devices:
if not device.sizes: # Skip devices with no assigned workload
continue

device_config = self.config.copy()
device_config['GlobalParameters']['Device'] = device.id
device_config['BenchmarkProblems'][0][1]['BenchmarkFinalParameters'][0]['ProblemSizes'] = device.sizes

config_path = os.path.join(self.output_dir, f'config_gpu_{device.id}.yaml')
with open(config_path, 'w') as f:
yaml.dump(device_config, f)

@staticmethod
def _run_tensile_process(args: Tuple[int, str, str]) -> Tuple[int, str, float]:
"""Execute Tensile process with comprehensive monitoring and error handling"""
device_id, output_dir, tensile_path = args
config_file = os.path.join(output_dir, f'config_gpu_{device_id}.yaml')
output_subdir = os.path.join(output_dir, f'outputs/gpu_{device_id}')
os.makedirs(output_subdir, exist_ok=True)

start_time = time.time()
process = None

try:
command = f'{tensile_path} {config_file} {output_subdir}'
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1 # Line buffered
)

# Monitor process output
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
logger.info(f"[Device {device_id}] {output.strip()}")

return_code = process.wait()
if return_code != 0:
raise subprocess.CalledProcessError(return_code, command)

except Exception as e:
logger.error(f"Error on device {device_id}: {str(e)}")
if process:
process.terminate()
raise
finally:
execution_time = time.time() - start_time
logger.info(f"Execution time for device {device_id}: {execution_time:.2f} seconds")

return device_id, output_subdir, execution_time

def _run_standard_tensile(self) -> None:
"""Execute standard Tensile command with monitoring"""
logger.info("Running standard Tensile execution...")
try:
tensile_path = os.path.join(os.path.dirname(sys.argv[0]), 'Tensile')
command = f'{tensile_path} {self.input_file} {self.output_dir}'

process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)

for line in process.stdout:
logger.info(f"[Standard Tensile] {line.strip()}")

return_code = process.wait()
if return_code != 0:
raise subprocess.CalledProcessError(return_code, command)

logger.info("Standard Tensile execution completed successfully")

except Exception as e:
logger.error(f"Standard Tensile execution failed: {str(e)}")
raise

def _merge_results(self, device_order: List[int]) -> str:
"""Merge results with improved error handling"""
merge_script = '/src/hipBLASLt/tensilelite/Tensile/Utilities/merge.py'
input_dirs = [f'{self.output_dir}/outputs/gpu_{device}/3_LibraryLogic' for device in device_order]
merged_output = f'{self.output_dir}/merged_output'

os.makedirs(merged_output, exist_ok=True)

try:
for i in range(1, len(input_dirs)):
original_dir = input_dirs[0] if i == 1 else merged_output
incremental_dir = input_dirs[i]

command = f'python {merge_script} {original_dir} {incremental_dir} {merged_output}'
subprocess.run(command, shell=True, check=True)

logger.info(f"Results successfully merged to: {merged_output}")
return merged_output
except Exception as e:
logger.error(f"Error merging results: {str(e)}")
raise

def _save_execution_summary(self, results: List[Tuple[int, str, float]]) -> None:
"""Save execution summary for analysis"""
summary = {
'total_devices': len(self.devices),
'execution_times': {device_id: time for device_id, _, time in results},
'total_time': max(time for _, _, time in results),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}

summary_path = os.path.join(self.output_dir, 'execution_summary.json')
with open(summary_path, 'w') as f:
json.dump(summary, f, indent=2)

def run(self) -> None:
"""Execute parallel tuning with comprehensive error handling"""
try:
# Check if we should use standard Tensile
if self._should_use_standard_tensile():
self._run_standard_tensile()
return

# Continue with parallel execution
self._distribute_workload()
self._generate_device_configs()

tensile_path = os.path.join(os.path.dirname(sys.argv[0]), 'Tensile')
active_devices = [device for device in self.devices if device.sizes]

# Execute parallel processes using Pool for better resource management
with Pool(processes=len(active_devices)) as pool:
tasks = [(device.id, self.output_dir, tensile_path) for device in active_devices]
results = pool.map(self._run_tensile_process, tasks)

# Sort results by execution time for efficient merging
results.sort(key=lambda x: x[2])

# Merge results and save summary
if results:
final_output = self._merge_results([result[0] for result in results])
self._save_execution_summary(results)
logger.info(f"Final merged output: {final_output}")
logger.info("Tuning completed successfully")

except Exception as e:
logger.error(f"Parallel tuning failed: {str(e)}")
logger.warning("Attempting fallback to standard Tensile execution...")
self._run_standard_tensile()

def main():
"""Main entry point with argument validation"""
if len(sys.argv) != 3:
logger.error("Usage: TensileParallel <config.yaml> <output_path>")
sys.exit(1)

try:
tuner = TensileParallel(sys.argv[1], sys.argv[2])
tuner.run()
except Exception as e:
logger.error(f"Fatal error: {str(e)}")
sys.exit(1)

if __name__ == '__main__':
main()