Changed from running subprocess to running imported methods.

This commit is contained in:
Ada Werefox 2026-06-02 16:05:53 -07:00
parent 0b7ac9e0a4
commit 0b398a2b4f
3 changed files with 157 additions and 51 deletions

View file

@ -57,31 +57,40 @@ Download a .7z archive from Nextcloud, backup local files, and replace them.""")
return parser.parse_args()
def download_backup(remote_archive_path: str):
def download_backup(
remote_archive_path: str,
hostname=NC_URL,
user=NC_USER,
password=NC_PASS,
temp_download_path=TEMP_DOWNLOAD_NAME,
):
"""Downloads the requested backup archive from Nextcloud"""
# Connect to Nextcloud
print("Connecting to Nextcloud...")
nc = NextCloud(NC_URL, user=NC_USER, password=NC_PASS)
nc = NextCloud(hostname, user=user, password=password)
# Download 7z archive from Nextcloud
print(f"Downloading '{remote_archive_path}' from Nextcloud...")
try:
downloaded_file = nc.get_file(remote_archive_path)
downloaded_file.download(target=TEMP_DOWNLOAD_NAME)
downloaded_file = nc.get_file(remote_archive_path) # type: ignore
downloaded_file.download(target=temp_download_path)
except Exception as e:
print(f"Error downloading file: {e}")
return
def decompress_backup():
def decompress_backup(
temp_extract_path=TEMP_EXTRACT_PATH,
temp_download_path=TEMP_DOWNLOAD_NAME,
):
"""Extract the files from the backup"""
# Uncompress the downloaded .7z archive into target directory
print(f"Extracting .7z files to:\n -> {TEMP_EXTRACT_PATH}")
print(f"Extracting .7z files to:\n -> {temp_extract_path}")
# 7z flags used:
# 'x' = extract with full paths
# '-o...' = target output directory (no space between -o and the path)
# '-y' = assume Yes on all queries (overwrite prompts)
cmd = ["7z", "x", TEMP_DOWNLOAD_NAME, f"-o{TEMP_EXTRACT_PATH}", "-y"]
cmd = ["7z", "x", temp_download_path, f"-o{temp_extract_path}", "-y"]
try:
# Run command and capture output; raises CalledProcessError if return code != 0
@ -96,15 +105,19 @@ def decompress_backup():
finally:
# Clean up downloaded archive file from configured path
if os.path.exists(TEMP_DOWNLOAD_NAME):
os.remove(TEMP_DOWNLOAD_NAME)
if os.path.exists(temp_download_path):
os.remove(temp_download_path)
def backup_and_restore(
backup_config_path: str, restore_config_path: str, is_ffxiv_config: bool
backup_config_path: str,
restore_config_path: str,
is_ffxiv_config: bool,
temp_extract_path=TEMP_EXTRACT_PATH,
):
"""Create backup of current config and restore downloaded backup"""
# Create backup of FFXIV config directory if it exists and has content
print(backup_config_path)
if os.path.exists(restore_config_path) and os.listdir(restore_config_path):
print(
"Creating safety backup of "
@ -119,14 +132,18 @@ def backup_and_restore(
)
shutil.rmtree(restore_config_path)
except Exception as e:
print(f"Failed during ffxiv config backup creation phase: {e}")
print(
f"Failed during {'ffxiv' if is_ffxiv_config else 'plugins'} config backup creation phase: {e}"
)
return
# Restore FFXIV config backup files
try:
print("Attempting to restore ffxiv config backups")
print(
f"Attempting to restore {'ffxiv' if is_ffxiv_config else 'plugins'} config backups"
)
shutil.copytree(
f"{TEMP_EXTRACT_PATH}\\{"game" if is_ffxiv_config else "plugins"}\\",
f"{temp_extract_path}\\{"game" if is_ffxiv_config else "plugins"}\\",
restore_config_path,
dirs_exist_ok=True,
)
@ -135,6 +152,25 @@ def backup_and_restore(
return
def temp_cleanup(temp_extract_path=TEMP_EXTRACT_PATH):
# Remove temporary extract files
try:
shutil.rmtree(temp_extract_path) # type: ignore
except Exception as e:
print(f"Failed to remove temporary extracted backup files: {e}")
return
def create_backup_dir(temp_backup_path=TEMP_BACKUP_PATH) -> str:
"""Create the unique directory that will store the extracted backup files"""
# Generate a unique, timestamped backup folder name in the temp backup directory
parent_dir = os.path.dirname(temp_backup_path) # type: ignore
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
backup_dir = os.path.join(parent_dir, f"backup_{timestamp}")
os.makedirs(backup_dir, exist_ok=True)
return backup_dir
def main():
# Parse command-line inputs and validate env
args = parse_arguments()
@ -148,24 +184,15 @@ def main():
download_backup(remote_archive_path)
# Generate a unique, timestamped backup folder name in the temp backup directory
parent_dir = os.path.dirname(TEMP_BACKUP_PATH)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
backup_dir = os.path.join(parent_dir, f"backup_{timestamp}")
os.makedirs(backup_dir, exist_ok=True)
backup_dir = create_backup_dir()
decompress_backup()
backup_and_restore(backup_dir, FFXIV_CONFIGS_PATH, True)
backup_and_restore(backup_dir, FFXIV_CONFIGS_PATH, True) # type: ignore
backup_and_restore(backup_dir, PLUGIN_CONFIG_PATH, False)
backup_and_restore(backup_dir, PLUGIN_CONFIG_PATH, False) # type: ignore
# Remove temporary extract files
try:
shutil.rmtree(TEMP_EXTRACT_PATH)
except Exception as e:
print(f"Failed to remove temporary extracted backup files: {e}")
return
temp_cleanup()
if __name__ == "__main__":

View file

@ -5,7 +5,7 @@ import os
import subprocess
from dotenv import load_dotenv
from datetime import datetime
from nextcloud import NextCloud
from nextcloud import NextCloud # type: ignore
from PyQt6.QtWidgets import (
QApplication,
QMainWindow,
@ -28,6 +28,9 @@ from PyQt6.QtWidgets import (
)
from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtGui import QFont
import restore_backup as rb
from contextlib import redirect_stdout
from io import StringIO
class WebDAVBrowser(QDialog):
@ -140,39 +143,104 @@ class WebDAVBrowser(QDialog):
class RestoreWorker(QThread):
output_signal = pyqtSignal(str)
finished_signal = pyqtSignal(bool, str)
# # --- CONFIGURATION FROM ENVIRONMENT ---
# NC_URL = os.getenv("NC_URL")
# NC_USER = os.getenv("NC_USER")
# NC_PASS = os.getenv("NC_PASS")
# TEMP_DOWNLOAD_NAME = os.getenv("NC_TEMP_DOWNLOAD", "\tmp\temp_download.7z")
# TEMP_EXTRACT_PATH = os.getenv("TEMP_EXTRACT_PATH")
# TEMP_BACKUP_PATH = os.getenv("TEMP_BACKUP_PATH")
# PLUGIN_CONFIG_PATH = os.getenv("PLUGIN_CONFIGS_PATH")
# FFXIV_CONFIGS_PATH = os.getenv("FFXIV_CONFIGS_PATH")
# # --------------------------------------
def __init__(self, remote_path):
def __init__(
self,
remote_path,
hostname,
username,
password,
temp_download_path,
temp_download_name,
temp_extract_path,
temp_backup_path,
ffxiv_configs_path,
plugin_configs_path,
):
super().__init__()
self.remote_path = remote_path
self.remote_path: str = remote_path
self.hostname: str = hostname
self.username: str = username
self.password: str = password
self.temp_download_path: str = temp_download_path
self.temp_download_name: str = temp_download_name
self.temp_extract_path: str = temp_extract_path
self.temp_backup_path: str = temp_backup_path
self.ffxiv_configs_path: str = ffxiv_configs_path
self.plugin_configs_path: str = plugin_configs_path
def run(self):
try:
# Run the restore script with the remote path
cmd = [sys.executable, "restore_backup.py", "-r", self.remote_path]
# cmd = [sys.executable, "restore_backup.py", "-r", self.remote_path]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
# process = subprocess.Popen(
# cmd,
# stdout=subprocess.PIPE,
# stderr=subprocess.STDOUT,
# text=True,
# bufsize=1,
# cwd=os.path.dirname(os.path.abspath(__file__)),
# )
# if process.stdout:
# for line in process.stdout:
# self.output_signal.emit(line.rstrip())
# process.wait()
# if process.returncode == 0:
# self.finished_signal.emit(True, "Restore completed successfully!")
# else:
# self.finished_signal.emit(
# False, f"Restore failed with return code {process.returncode}"
# )
output_buffer = StringIO()
with redirect_stdout(output_buffer):
rb.download_backup(
self.remote_path,
hostname=self.hostname,
user=self.username,
password=self.password,
temp_download_path=f"{self.temp_download_path}/{self.temp_download_name}",
)
backup_dir = rb.create_backup_dir(
temp_backup_path=f"{self.temp_backup_path}\\."
)
rb.decompress_backup(
temp_extract_path=f"{self.temp_download_path}/{self.temp_extract_path}",
temp_download_path=f"{self.temp_download_path}/{self.temp_download_name}",
)
rb.backup_and_restore(
backup_config_path=backup_dir,
restore_config_path=self.ffxiv_configs_path,
is_ffxiv_config=True,
temp_extract_path=f"{self.temp_download_path}/{self.temp_extract_path}",
)
rb.backup_and_restore(
backup_config_path=backup_dir,
restore_config_path=self.plugin_configs_path,
is_ffxiv_config=False,
temp_extract_path=f"{self.temp_download_path}/{self.temp_extract_path}",
)
rb.temp_cleanup(f"{self.temp_download_path}/{self.temp_extract_path}")
if process.stdout:
for line in process.stdout:
self.output_signal.emit(line.rstrip())
process.wait()
if process.returncode == 0:
self.output_signal.emit(output_buffer.getvalue())
self.finished_signal.emit(True, "Restore completed successfully!")
else:
self.finished_signal.emit(
False, f"Restore failed with return code {process.returncode}"
)
except Exception as e:
self.output_signal.emit(output_buffer.getvalue())
self.finished_signal.emit(False, f"Error: {str(e)}")
@ -402,7 +470,18 @@ class RestoreUI(QMainWindow):
)
self.output_text.append(f"Remote path: {remote_path}\n")
self.restore_worker = RestoreWorker(remote_path)
self.restore_worker = RestoreWorker(
remote_path,
self.nc_url_input.text(),
self.nc_user_input.text(),
self.nc_pass_input.text(),
self.temp_dir_input.text(),
self.temp_download_name_input.text(),
self.temp_extract_name_input.text(),
self.temp_backup_input.text(),
self.ffxiv_configs_input.text(),
self.plugin_configs_input.text(),
)
self.restore_worker.output_signal.connect(self.append_output)
self.restore_worker.finished_signal.connect(self.restore_finished)
self.restore_worker.start()

View file

@ -6,7 +6,7 @@ a = Analysis(
['restore_ui.py', 'restore_backup.py'],
pathex=[],
binaries=[],
datas=[('.env', '.')], # Include .env file
datas=[], # Include .env file
hiddenimports=['PyQt6.QtCore', 'PyQt6.QtGui', 'PyQt6.QtWidgets'],
hookspath=[],
hooksconfig={},