229 lines
7.7 KiB
Python
229 lines
7.7 KiB
Python
import os
|
|
import re
|
|
import sys
|
|
import yaml
|
|
import shutil
|
|
import glob
|
|
|
|
def load_configuration(config_path):
|
|
"""
|
|
Load configuration from a YAML file.
|
|
"""
|
|
with open(config_path, 'r', encoding='utf-8') as config_file:
|
|
return yaml.safe_load(config_file)
|
|
|
|
def copy_with_mode(src, dest, mode='copy'):
|
|
"""
|
|
Copy files or directories with different modes and specific source/target paths.
|
|
"""
|
|
try:
|
|
# Ensure full paths are used
|
|
src = os.path.abspath(src)
|
|
dest = os.path.abspath(dest)
|
|
|
|
# Ensure destination directory exists
|
|
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
|
|
|
# Determine copy mode
|
|
if mode == 'copy':
|
|
# Skip if destination already exists
|
|
if os.path.exists(dest):
|
|
print(f'Skipping {src}: Destination already exists')
|
|
return False
|
|
|
|
elif mode == 'replace':
|
|
# Remove existing destination before copying
|
|
if os.path.exists(dest):
|
|
if os.path.isdir(dest):
|
|
shutil.rmtree(dest)
|
|
else:
|
|
os.remove(dest)
|
|
|
|
elif mode == 'update':
|
|
# Only copy if source is newer
|
|
if os.path.exists(dest):
|
|
src_mtime = os.path.getmtime(src)
|
|
dest_mtime = os.path.getmtime(dest)
|
|
if src_mtime <= dest_mtime:
|
|
print(f'Skipping {src}: Destination is up to date')
|
|
return False
|
|
|
|
# Perform copy
|
|
if os.path.isdir(src):
|
|
shutil.copytree(src, dest)
|
|
else:
|
|
shutil.copy2(src, dest)
|
|
|
|
print(f'Copied: {src} -> {dest}')
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f'Error copying {src}: {e}')
|
|
return False
|
|
|
|
def matches_file_pattern(filename, pattern):
|
|
"""
|
|
Check if filename matches the given pattern.
|
|
"""
|
|
return re.search(pattern, filename) is not None
|
|
|
|
def apply_insert_rules(content, insert_rules):
|
|
"""
|
|
Apply insertion rules to the content.
|
|
"""
|
|
modified_content = content
|
|
modified = False
|
|
|
|
for rule in insert_rules:
|
|
insert_text = rule.get('insert_text')
|
|
|
|
# Insert after specific text
|
|
if 'after_text' in rule:
|
|
search_text = rule.get('after_text')
|
|
if insert_text not in modified_content:
|
|
modified_content = modified_content.replace(
|
|
search_text,
|
|
f'{search_text}\n{insert_text}'
|
|
)
|
|
modified = True
|
|
|
|
# Insert before specific text
|
|
elif 'before_text' in rule:
|
|
search_text = rule.get('before_text')
|
|
if insert_text not in modified_content:
|
|
modified_content = modified_content.replace(
|
|
search_text,
|
|
f'{insert_text}\n{search_text}'
|
|
)
|
|
modified = True
|
|
|
|
return modified_content, modified
|
|
|
|
def apply_replace_rules(content, replace_rules):
|
|
"""
|
|
Apply replacement rules to the content.
|
|
"""
|
|
modified_content = content
|
|
modified = False
|
|
|
|
for rule in replace_rules:
|
|
old_text = rule.get('old_text')
|
|
new_text = rule.get('new_text')
|
|
|
|
# Replace text if found
|
|
if old_text in modified_content:
|
|
modified_content = modified_content.replace(old_text, new_text)
|
|
modified = True
|
|
|
|
return modified_content, modified
|
|
|
|
def process_copy_and_modify_rules(config):
|
|
"""
|
|
Process copy, insertion, and replacement rules.
|
|
"""
|
|
successful_operations = {
|
|
'copies': 0,
|
|
'modifications': 0
|
|
}
|
|
|
|
# Process copy rules
|
|
for copy_rule in config.get('copy_rules', []):
|
|
sources = copy_rule.get('sources', [])
|
|
mode = copy_rule.get('mode', 'copy')
|
|
|
|
for source_info in sources:
|
|
# Support both simple string and dictionary input
|
|
if isinstance(source_info, str):
|
|
src = source_info
|
|
dest = os.path.join(
|
|
config.get('destination_directory', '.'),
|
|
os.path.basename(src)
|
|
)
|
|
elif isinstance(source_info, dict):
|
|
src = source_info.get('source')
|
|
dest = source_info.get('target')
|
|
|
|
# Fallback if target is not specified
|
|
if not dest:
|
|
dest = os.path.join(
|
|
config.get('destination_directory', '.'),
|
|
os.path.basename(src)
|
|
)
|
|
else:
|
|
print(f'Invalid source configuration: {source_info}')
|
|
continue
|
|
|
|
# Expand potential wildcards
|
|
matching_sources = glob.glob(src)
|
|
|
|
for matched_src in matching_sources:
|
|
# Determine full destination path
|
|
if os.path.isdir(matched_src):
|
|
full_dest = os.path.join(dest, os.path.basename(matched_src))
|
|
else:
|
|
full_dest = dest
|
|
|
|
# Perform copy
|
|
if copy_with_mode(matched_src, full_dest, mode):
|
|
successful_operations['copies'] += 1
|
|
|
|
# Process modification rules
|
|
destination_dir = config.get('destination_directory', '.')
|
|
|
|
for mod_rule in config.get('modification_rules', []):
|
|
search_pattern = mod_rule.get('file_pattern')
|
|
insert_rules = mod_rule.get('insert_rules', [])
|
|
replace_rules = mod_rule.get('replace_rules', [])
|
|
|
|
# Walk through destination directory
|
|
for root, _, files in os.walk(destination_dir):
|
|
for filename in files:
|
|
# Check if file matches pattern
|
|
if matches_file_pattern(filename, search_pattern):
|
|
full_path = os.path.join(root, filename)
|
|
|
|
try:
|
|
# Read file content
|
|
with open(full_path, 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
|
|
# Apply insert rules
|
|
content_after_insert, insert_modified = apply_insert_rules(
|
|
content, insert_rules
|
|
)
|
|
|
|
# Apply replace rules
|
|
final_content, replace_modified = apply_replace_rules(
|
|
content_after_insert, replace_rules
|
|
)
|
|
|
|
# Save modified file
|
|
if insert_modified or replace_modified:
|
|
with open(full_path, 'w', encoding='utf-8') as file:
|
|
file.write(final_content)
|
|
|
|
successful_operations['modifications'] += 1
|
|
print(f'Modified: {full_path}')
|
|
|
|
except Exception as e:
|
|
print(f'Error processing file {full_path}: {e}')
|
|
|
|
return successful_operations
|
|
|
|
def main():
|
|
# Check command-line argument
|
|
if len(sys.argv) < 2:
|
|
print("Please provide the path to the configuration file.")
|
|
sys.exit(1)
|
|
|
|
config_path = sys.argv[1]
|
|
|
|
# Load configuration and process rules
|
|
config = load_configuration(config_path)
|
|
results = process_copy_and_modify_rules(config)
|
|
|
|
print(f'\nTotal successful copies: {results["copies"]}')
|
|
print(f'Total file modifications: {results["modifications"]}')
|
|
|
|
if __name__ == '__main__':
|
|
main() |