add exceptions
This commit is contained in:
@ -2,112 +2,176 @@ import os
|
||||
import shutil
|
||||
import yaml
|
||||
import re
|
||||
import sys
|
||||
|
||||
def loadConfig(config_path):
|
||||
def loadConfig(configPath):
|
||||
"""Load YAML configuration."""
|
||||
print(f"Loading configuration from {config_path}")
|
||||
with open(config_path, 'r', encoding='utf-8') as file:
|
||||
config = yaml.safe_load(file)
|
||||
print("Configuration loaded successfully.")
|
||||
return config
|
||||
try:
|
||||
print(f"Loading configuration from {configPath}")
|
||||
with open(configPath, 'r', encoding='utf-8') as file:
|
||||
config = yaml.safe_load(file)
|
||||
|
||||
# validate configuration
|
||||
if not config:
|
||||
raise ValueError("Empty configuration file")
|
||||
|
||||
print("Configuration loaded successfully.")
|
||||
return config
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Configuration file not found at {configPath}")
|
||||
sys.exit(1)
|
||||
except yaml.YAMLError as e:
|
||||
print(f"Error parsing YAML configuration: {e}")
|
||||
sys.exit(1)
|
||||
except ValueError as e:
|
||||
print(f"Configuration error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def ensureDirectory(path):
|
||||
"""Ensure that a directory exists."""
|
||||
print(f"Checking/creating directory: {path}")
|
||||
os.makedirs(path, exist_ok=True)
|
||||
|
||||
def copySources(config, destination_directory):
|
||||
|
||||
def copySources(config, destinationDirectory, results):
|
||||
"""Copy files and folders according to copy rules."""
|
||||
print("Starting source file/folder copy process...")
|
||||
for rule in config.get('copy_rules', []):
|
||||
for source in rule.get('sources', []):
|
||||
# Distinguish between sources with explicit target and without
|
||||
if isinstance(source, dict):
|
||||
src_path = source['source']
|
||||
target_path = os.path.join(destination_directory, source['target'])
|
||||
else:
|
||||
src_path = source
|
||||
target_path = os.path.join(destination_directory, os.path.basename(src_path))
|
||||
try:
|
||||
# Distinguish between sources with explicit target and without
|
||||
if isinstance(source, dict):
|
||||
sourcePath = source['source']
|
||||
targetPath = os.path.join(destinationDirectory, source['target'])
|
||||
else:
|
||||
sourcePath = source
|
||||
targetPath = os.path.join(destinationDirectory, os.path.basename(sourcePath))
|
||||
|
||||
# Create target directory
|
||||
ensureDirectory(os.path.dirname(targetPath))
|
||||
|
||||
# Copy or replace mode
|
||||
if rule.get('mode') == 'replace' and os.path.exists(targetPath):
|
||||
print(f"Replacing existing path: {targetPath}")
|
||||
|
||||
# Explicitly handle directory or file deletion
|
||||
if os.path.isdir(targetPath):
|
||||
shutil.rmtree(targetPath)
|
||||
else:
|
||||
os.remove(targetPath)
|
||||
|
||||
# Update mode
|
||||
if rule.get('mode') == 'update' and os.path.exists(targetPath):
|
||||
print(f"Checking if {sourcePath} is newer than {targetPath}")
|
||||
srcMtime = os.path.getmtime(sourcePath)
|
||||
destMtime = os.path.getmtime(targetPath)
|
||||
if srcMtime <= destMtime:
|
||||
print(f"Skipping {sourcePath}: Destination is up to date")
|
||||
break # Skip this source file/folder
|
||||
|
||||
# Copy files or directories
|
||||
if os.path.isdir(sourcePath):
|
||||
print(f"Copying directory: {sourcePath} -> {targetPath}")
|
||||
shutil.copytree(sourcePath, targetPath)
|
||||
results['copies'] += 1
|
||||
else:
|
||||
print(f"Copying file: {sourcePath} -> {targetPath}")
|
||||
shutil.copy2(sourcePath, targetPath)
|
||||
results['copies'] += 1
|
||||
except PermissionError:
|
||||
print(f"Error: Permission denied when copying {sourcePath}")
|
||||
except OSError as e:
|
||||
print(f"Error copying {sourcePath}: {e}")
|
||||
|
||||
# Create target directory
|
||||
ensureDirectory(os.path.dirname(target_path))
|
||||
|
||||
# Copy or replace mode
|
||||
if rule.get('mode') == 'replace' and os.path.exists(target_path):
|
||||
print(f"Replacing existing path: {target_path}")
|
||||
shutil.rmtree(target_path) if os.path.isdir(target_path) else os.remove(target_path)
|
||||
|
||||
# Copy files or directories
|
||||
if os.path.isdir(src_path):
|
||||
print(f"Copying directory: {src_path} -> {target_path}")
|
||||
shutil.copytree(src_path, target_path)
|
||||
else:
|
||||
print(f"Copying file: {src_path} -> {target_path}")
|
||||
shutil.copy2(src_path, target_path)
|
||||
print("Source file/folder copy process completed.")
|
||||
|
||||
def modifyFiles(config, destination_directory):
|
||||
|
||||
def modifyFiles(config, destinationDirectory, results):
|
||||
"""Modify files according to modification rules."""
|
||||
print("Starting file modification process...")
|
||||
for rule in config.get('modification_rules', []):
|
||||
file_pattern = rule.get('file_pattern', '*')
|
||||
print(f"Processing files matching pattern: {file_pattern}")
|
||||
filePattern = rule.get('file_pattern', '*')
|
||||
print(f"Processing files matching pattern: {filePattern}")
|
||||
|
||||
# Recursively search the destination directory
|
||||
for root, _, files in os.walk(destination_directory):
|
||||
for root, _, files in os.walk(destinationDirectory):
|
||||
for filename in files:
|
||||
if re.match(file_pattern, filename):
|
||||
file_path = os.path.join(root, filename)
|
||||
print(f"Modifying file: {file_path}")
|
||||
|
||||
# Read file content
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Perform text replacements
|
||||
for insert_rule in rule.get('insert_rules', []):
|
||||
if 'after_text' in insert_rule:
|
||||
print(f" Inserting text after: {insert_rule['after_text']}")
|
||||
content = content.replace(
|
||||
insert_rule['after_text'],
|
||||
insert_rule['after_text'] + insert_rule['insert_text']
|
||||
)
|
||||
|
||||
if 'before_text' in insert_rule:
|
||||
print(f" Inserting text before: {insert_rule['before_text']}")
|
||||
content = content.replace(
|
||||
insert_rule['before_text'],
|
||||
insert_rule['insert_text'] + insert_rule['before_text']
|
||||
)
|
||||
|
||||
if 'old_text' in insert_rule:
|
||||
print(f" Replacing text: {insert_rule['old_text']} -> {insert_rule['new_text']}")
|
||||
content = content.replace(
|
||||
insert_rule['old_text'],
|
||||
insert_rule['new_text']
|
||||
)
|
||||
|
||||
# Write modified contents
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
try:
|
||||
if re.match(filePattern, filename):
|
||||
filePath = os.path.join(root, filename)
|
||||
print(f"Modifying file: {filePath}")
|
||||
|
||||
# Read file content
|
||||
with open(filePath, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Perform text replacements
|
||||
for insertRule in rule.get('insert_rules', []):
|
||||
if 'after_text' in insertRule:
|
||||
print(f" Inserting text after: {insertRule['after_text']}")
|
||||
content = content.replace(
|
||||
insertRule['after_text'],
|
||||
insertRule['after_text'] + insertRule['insert_text']
|
||||
)
|
||||
|
||||
if 'before_text' in insertRule:
|
||||
print(f" Inserting text before: {insertRule['before_text']}")
|
||||
content = content.replace(
|
||||
insertRule['before_text'],
|
||||
insertRule['insert_text'] + insertRule['before_text']
|
||||
)
|
||||
|
||||
if 'old_text' in insertRule:
|
||||
print(f" Replacing text: {insertRule['old_text']} -> {insertRule['new_text']}")
|
||||
content = content.replace(
|
||||
insertRule['old_text'],
|
||||
insertRule['new_text']
|
||||
)
|
||||
|
||||
# Write modified contents
|
||||
with open(filePath, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
|
||||
results['modifications'] += 1
|
||||
except PermissionError:
|
||||
print(f"Error: Permission denied when modifying {filePath}")
|
||||
except IOError as e:
|
||||
print(f"Error reading/writing file {filePath}: {e}")
|
||||
|
||||
print("File modification process completed.")
|
||||
|
||||
def main(config_path):
|
||||
|
||||
def main():
|
||||
"""Main function to execute all operations."""
|
||||
# Load configuration
|
||||
config = loadConfig(config_path)
|
||||
# Check command-line argument
|
||||
if len(sys.argv) < 2:
|
||||
print("Please provide the path to the configuration file.")
|
||||
sys.exit(1)
|
||||
|
||||
configPath = sys.argv[1]
|
||||
|
||||
# Load configuration
|
||||
config = loadConfig(configPath)
|
||||
|
||||
# Initialize results
|
||||
results = {"copies": 0, "modifications": 0}
|
||||
|
||||
# Ensure destination directory
|
||||
destination_directory = config.get('destination_directory', './web')
|
||||
ensureDirectory(destination_directory)
|
||||
destinationDirectory = config.get('destination_directory', './web')
|
||||
ensureDirectory(destinationDirectory)
|
||||
|
||||
# Copy files and folders
|
||||
copySources(config, destination_directory)
|
||||
copySources(config, destinationDirectory, results)
|
||||
|
||||
# Modify files
|
||||
modifyFiles(config, destination_directory)
|
||||
|
||||
print(f"All operations in {destination_directory} completed successfully.")
|
||||
modifyFiles(config, destinationDirectory, results)
|
||||
|
||||
# Print results
|
||||
print(f'\nTotal successful copies: {results["copies"]}')
|
||||
print(f'Total file modifications: {results["modifications"]}')
|
||||
print(f"All operations in {destinationDirectory} completed successfully.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main('config.yaml')
|
||||
main()
|
Reference in New Issue
Block a user