451 lines
15 KiB
Python
451 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
from dotenv import load_dotenv
|
|
from datetime import datetime
|
|
from nextcloud import NextCloud
|
|
from PyQt6.QtWidgets import (
|
|
QApplication,
|
|
QMainWindow,
|
|
QWidget,
|
|
QVBoxLayout,
|
|
QHBoxLayout,
|
|
QTabWidget,
|
|
QLabel,
|
|
QLineEdit,
|
|
QPushButton,
|
|
QTextEdit,
|
|
QFileDialog,
|
|
QMessageBox,
|
|
QProgressBar,
|
|
QGroupBox,
|
|
QFormLayout,
|
|
QDialog,
|
|
QListWidget,
|
|
QListWidgetItem,
|
|
)
|
|
from PyQt6.QtCore import Qt, QThread, pyqtSignal
|
|
from PyQt6.QtGui import QFont
|
|
from pprint import pprint
|
|
|
|
|
|
class WebDAVBrowser(QDialog):
|
|
def __init__(self, parent, nc_url, nc_user, nc_pass):
|
|
super().__init__(parent)
|
|
self.setWindowTitle("Browse Nextcloud")
|
|
self.setGeometry(100, 100, 600, 400)
|
|
self.nc_url = nc_url
|
|
self.nc_user = nc_user
|
|
self.nc_pass = nc_pass
|
|
self.selected_path = None
|
|
self.current_path = "/"
|
|
|
|
try:
|
|
self.nc = NextCloud(nc_url, user=nc_user, password=nc_pass)
|
|
self.nc.login()
|
|
except Exception as e:
|
|
QMessageBox.critical(
|
|
self, "Connection Error", f"Failed to connect to Nextcloud: {str(e)}"
|
|
)
|
|
self.reject()
|
|
return
|
|
|
|
self.init_ui()
|
|
self.load_directory(self.current_path)
|
|
|
|
def init_ui(self):
|
|
layout = QVBoxLayout()
|
|
|
|
# Path display and navigation
|
|
nav_layout = QHBoxLayout()
|
|
self.path_label = QLabel(self.current_path)
|
|
nav_layout.addWidget(QLabel("Current Path:"), 0)
|
|
nav_layout.addWidget(self.path_label, 1)
|
|
|
|
up_btn = QPushButton("Up")
|
|
up_btn.clicked.connect(self.go_up)
|
|
nav_layout.addWidget(up_btn)
|
|
|
|
layout.addLayout(nav_layout)
|
|
|
|
# File list
|
|
self.file_list = QListWidget()
|
|
self.file_list.itemDoubleClicked.connect(self.item_double_clicked)
|
|
layout.addWidget(self.file_list)
|
|
|
|
# Buttons
|
|
btn_layout = QHBoxLayout()
|
|
select_btn = QPushButton("Select")
|
|
select_btn.clicked.connect(self.select_file)
|
|
cancel_btn = QPushButton("Cancel")
|
|
cancel_btn.clicked.connect(self.reject)
|
|
|
|
btn_layout.addStretch()
|
|
btn_layout.addWidget(select_btn)
|
|
btn_layout.addWidget(cancel_btn)
|
|
|
|
layout.addLayout(btn_layout)
|
|
self.setLayout(layout)
|
|
|
|
def load_directory(self, path):
|
|
try:
|
|
self.file_list.clear()
|
|
self.current_path = path
|
|
self.path_label.setText(path)
|
|
|
|
response = self.nc.list_folders(path=path) # type: ignore
|
|
pprint(response.json_data)
|
|
|
|
# Try to extract data from webDavResponse
|
|
file_list = []
|
|
if hasattr(response, "__iter__") and not isinstance(response, (str, bytes)):
|
|
try:
|
|
file_list = list(response)
|
|
except TypeError:
|
|
# If direct iteration fails, try accessing as dict keys
|
|
if isinstance(response, dict):
|
|
file_list = list(response.keys())
|
|
elif hasattr(response, "data"):
|
|
file_list = (
|
|
response.data
|
|
if isinstance(response.data, list)
|
|
else list(response.data)
|
|
)
|
|
|
|
for item in sorted(file_list):
|
|
name = str(item).rstrip("/")
|
|
if name:
|
|
is_dir = str(item).endswith("/")
|
|
display_name = f"[DIR] {name}" if is_dir else name
|
|
list_item = QListWidgetItem(display_name)
|
|
list_item.setData(Qt.ItemDataRole.UserRole, (name, is_dir))
|
|
self.file_list.addItem(list_item)
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", f"Failed to load directory: {str(e)}")
|
|
|
|
def item_double_clicked(self, item):
|
|
name, is_dir = item.data(Qt.ItemDataRole.UserRole)
|
|
if is_dir:
|
|
new_path = self.current_path.rstrip("/") + "/" + name
|
|
self.load_directory(new_path)
|
|
|
|
def go_up(self):
|
|
if self.current_path != "/":
|
|
parent = "/".join(self.current_path.rstrip("/").split("/")[:-1]) or "/"
|
|
self.load_directory(parent)
|
|
|
|
def select_file(self):
|
|
current_item = self.file_list.currentItem()
|
|
if current_item:
|
|
name, is_dir = current_item.data(Qt.ItemDataRole.UserRole)
|
|
if not is_dir:
|
|
self.selected_path = self.current_path.rstrip("/") + "/" + name
|
|
self.accept()
|
|
else:
|
|
QMessageBox.warning(
|
|
self, "Invalid Selection", "Please select a file, not a directory"
|
|
)
|
|
else:
|
|
QMessageBox.warning(self, "No Selection", "Please select a file")
|
|
|
|
|
|
class RestoreWorker(QThread):
|
|
output_signal = pyqtSignal(str)
|
|
finished_signal = pyqtSignal(bool, str)
|
|
|
|
def __init__(self, remote_path):
|
|
super().__init__()
|
|
self.remote_path = remote_path
|
|
|
|
def run(self):
|
|
try:
|
|
# Run the restore script with the remote path
|
|
cmd = [sys.executable, "restore_backup.py", "-r", self.remote_path]
|
|
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
bufsize=1,
|
|
cwd=os.path.dirname(os.path.abspath(__file__)),
|
|
)
|
|
|
|
for line in process.stdout:
|
|
self.output_signal.emit(line.rstrip())
|
|
|
|
process.wait()
|
|
|
|
if process.returncode == 0:
|
|
self.finished_signal.emit(True, "Restore completed successfully!")
|
|
else:
|
|
self.finished_signal.emit(
|
|
False, f"Restore failed with return code {process.returncode}"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.finished_signal.emit(False, f"Error: {str(e)}")
|
|
|
|
|
|
class RestoreUI(QMainWindow):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowTitle("FFXIV Backup Restore")
|
|
self.setGeometry(100, 100, 900, 700)
|
|
|
|
load_dotenv()
|
|
self.restore_worker = None
|
|
|
|
self.init_ui()
|
|
|
|
def init_ui(self):
|
|
central_widget = QWidget()
|
|
self.setCentralWidget(central_widget)
|
|
layout = QVBoxLayout()
|
|
|
|
tabs = QTabWidget()
|
|
tabs.addTab(self.create_settings_tab(), "Settings")
|
|
tabs.addTab(self.create_restore_tab(), "Restore")
|
|
|
|
layout.addWidget(tabs)
|
|
central_widget.setLayout(layout)
|
|
|
|
def create_settings_tab(self):
|
|
widget = QWidget()
|
|
layout = QVBoxLayout()
|
|
|
|
# Nextcloud Section
|
|
nc_group = QGroupBox("Nextcloud Configuration")
|
|
nc_layout = QFormLayout()
|
|
|
|
self.nc_url_input = QLineEdit(os.getenv("NC_URL", ""))
|
|
self.nc_user_input = QLineEdit(os.getenv("NC_USER", ""))
|
|
self.nc_pass_input = QLineEdit(os.getenv("NC_PASS", ""))
|
|
self.nc_pass_input.setEchoMode(QLineEdit.EchoMode.Password)
|
|
|
|
nc_layout.addRow("Nextcloud URL:", self.nc_url_input)
|
|
nc_layout.addRow("Username:", self.nc_user_input)
|
|
nc_layout.addRow("Password:", self.nc_pass_input)
|
|
nc_group.setLayout(nc_layout)
|
|
|
|
# Paths Section
|
|
paths_group = QGroupBox("Local Paths")
|
|
paths_layout = QVBoxLayout()
|
|
|
|
# Temp directory (with browse)
|
|
self.temp_dir_input = QLineEdit(
|
|
os.path.dirname(os.getenv("NC_TEMP_DOWNLOAD", ""))
|
|
)
|
|
paths_layout.addLayout(
|
|
self._create_path_row(
|
|
"Temp Directory:", self.temp_dir_input, self.browse_temp_directory
|
|
)
|
|
)
|
|
|
|
# Temp filenames
|
|
self.temp_download_name_input = QLineEdit(
|
|
os.path.basename(os.getenv("NC_TEMP_DOWNLOAD", "temp_download.7z"))
|
|
)
|
|
temp_download_layout = QHBoxLayout()
|
|
temp_download_layout.addWidget(QLabel("Temp Download Filename:"), 1)
|
|
temp_download_layout.addWidget(self.temp_download_name_input, 3)
|
|
paths_layout.addLayout(temp_download_layout)
|
|
|
|
self.temp_extract_name_input = QLineEdit(
|
|
os.path.basename(os.getenv("TEMP_EXTRACT_PATH", "temp_extract"))
|
|
)
|
|
temp_extract_layout = QHBoxLayout()
|
|
temp_extract_layout.addWidget(QLabel("Temp Extract Subdirectory:"), 1)
|
|
temp_extract_layout.addWidget(self.temp_extract_name_input, 3)
|
|
paths_layout.addLayout(temp_extract_layout)
|
|
|
|
# Other paths (with browse)
|
|
self.temp_backup_input = QLineEdit(os.getenv("TEMP_BACKUP_PATH", ""))
|
|
self.plugin_configs_input = QLineEdit(os.getenv("PLUGIN_CONFIGS_PATH", ""))
|
|
self.ffxiv_configs_input = QLineEdit(os.getenv("FFXIV_CONFIGS_PATH", ""))
|
|
|
|
paths_layout.addLayout(
|
|
self._create_path_row(
|
|
"Temp Backup:", self.temp_backup_input, self.browse_temp_backup
|
|
)
|
|
)
|
|
paths_layout.addLayout(
|
|
self._create_path_row(
|
|
"Plugin Configs:", self.plugin_configs_input, self.browse_plugin_configs
|
|
)
|
|
)
|
|
paths_layout.addLayout(
|
|
self._create_path_row(
|
|
"FFXIV Configs:", self.ffxiv_configs_input, self.browse_ffxiv_configs
|
|
)
|
|
)
|
|
|
|
paths_group.setLayout(paths_layout)
|
|
|
|
layout.addWidget(nc_group)
|
|
layout.addWidget(paths_group)
|
|
layout.addStretch()
|
|
|
|
widget.setLayout(layout)
|
|
return widget
|
|
|
|
def _create_path_row(self, label, input_field, browse_callback):
|
|
row_layout = QHBoxLayout()
|
|
row_layout.addWidget(QLabel(label), 1)
|
|
row_layout.addWidget(input_field, 3)
|
|
browse_btn = QPushButton("Browse...")
|
|
browse_btn.clicked.connect(browse_callback)
|
|
row_layout.addWidget(browse_btn)
|
|
return row_layout
|
|
|
|
def browse_temp_directory(self):
|
|
path = QFileDialog.getExistingDirectory(self, "Select Temp Directory")
|
|
if path:
|
|
self.temp_dir_input.setText(path)
|
|
|
|
def browse_temp_backup(self):
|
|
path = QFileDialog.getExistingDirectory(self, "Select Temp Backup Directory")
|
|
if path:
|
|
self.temp_backup_input.setText(path)
|
|
|
|
def browse_plugin_configs(self):
|
|
path = QFileDialog.getExistingDirectory(self, "Select Plugin Configs Directory")
|
|
if path:
|
|
self.plugin_configs_input.setText(path)
|
|
|
|
def browse_ffxiv_configs(self):
|
|
path = QFileDialog.getExistingDirectory(self, "Select FFXIV Configs Directory")
|
|
if path:
|
|
self.ffxiv_configs_input.setText(path)
|
|
|
|
def create_restore_tab(self):
|
|
widget = QWidget()
|
|
layout = QVBoxLayout()
|
|
|
|
# Remote path input
|
|
remote_group = QGroupBox("Select Backup to Restore")
|
|
remote_layout = QHBoxLayout()
|
|
|
|
QLabel("Remote Archive Path:").setFont(QFont("Arial", 10))
|
|
self.remote_path_input = QLineEdit()
|
|
self.remote_path_input.setPlaceholderText(
|
|
"e.g., Backups/FFXIV/Desktop/backup_file.7z"
|
|
)
|
|
|
|
remote_layout.addWidget(QLabel("Remote Archive Path:"))
|
|
remote_layout.addWidget(self.remote_path_input)
|
|
|
|
browse_webdav_btn = QPushButton("Browse WebDAV")
|
|
browse_webdav_btn.clicked.connect(self.browse_webdav)
|
|
remote_layout.addWidget(browse_webdav_btn)
|
|
|
|
remote_group.setLayout(remote_layout)
|
|
layout.addWidget(remote_group)
|
|
|
|
# Output display
|
|
output_group = QGroupBox("Operation Progress")
|
|
output_layout = QVBoxLayout()
|
|
|
|
self.output_text = QTextEdit()
|
|
self.output_text.setReadOnly(True)
|
|
self.output_text.setFont(QFont("Courier New", 9))
|
|
|
|
self.progress_bar = QProgressBar()
|
|
self.progress_bar.setVisible(False)
|
|
|
|
output_layout.addWidget(self.output_text)
|
|
output_layout.addWidget(self.progress_bar)
|
|
|
|
output_group.setLayout(output_layout)
|
|
layout.addWidget(output_group)
|
|
|
|
# Control buttons
|
|
button_layout = QHBoxLayout()
|
|
|
|
self.restore_button = QPushButton("Start Restore")
|
|
self.restore_button.clicked.connect(self.start_restore)
|
|
|
|
self.clear_button = QPushButton("Clear Output")
|
|
self.clear_button.clicked.connect(self.output_text.clear)
|
|
|
|
button_layout.addWidget(self.restore_button)
|
|
button_layout.addWidget(self.clear_button)
|
|
button_layout.addStretch()
|
|
|
|
layout.addLayout(button_layout)
|
|
|
|
widget.setLayout(layout)
|
|
return widget
|
|
|
|
def browse_webdav(self):
|
|
nc_url = self.nc_url_input.text().strip()
|
|
nc_user = self.nc_user_input.text().strip()
|
|
nc_pass = self.nc_pass_input.text().strip()
|
|
|
|
if not all([nc_url, nc_user, nc_pass]):
|
|
QMessageBox.warning(
|
|
self,
|
|
"Missing Credentials",
|
|
"Please configure Nextcloud credentials in Settings tab first",
|
|
)
|
|
return
|
|
|
|
browser = WebDAVBrowser(self, nc_url, nc_user, nc_pass)
|
|
if browser.exec() == QDialog.DialogCode.Accepted and browser.selected_path:
|
|
self.remote_path_input.setText(browser.selected_path)
|
|
|
|
def start_restore(self):
|
|
remote_path = self.remote_path_input.text().strip()
|
|
|
|
if not remote_path:
|
|
QMessageBox.warning(
|
|
self, "Input Required", "Please enter the remote archive path"
|
|
)
|
|
return
|
|
|
|
self.output_text.clear()
|
|
self.progress_bar.setVisible(True)
|
|
self.progress_bar.setMaximum(0)
|
|
self.restore_button.setEnabled(False)
|
|
|
|
self.output_text.append(
|
|
f"[{datetime.now().strftime('%H:%M:%S')}] Starting restore process..."
|
|
)
|
|
self.output_text.append(f"Remote path: {remote_path}\n")
|
|
|
|
self.restore_worker = RestoreWorker(remote_path)
|
|
self.restore_worker.output_signal.connect(self.append_output)
|
|
self.restore_worker.finished_signal.connect(self.restore_finished)
|
|
self.restore_worker.start()
|
|
|
|
def append_output(self, text):
|
|
self.output_text.append(text)
|
|
self.output_text.verticalScrollBar().setValue(
|
|
self.output_text.verticalScrollBar().maximum()
|
|
)
|
|
|
|
def restore_finished(self, success, message):
|
|
self.progress_bar.setVisible(False)
|
|
self.restore_button.setEnabled(True)
|
|
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
self.output_text.append(f"\n[{timestamp}] {message}")
|
|
|
|
if success:
|
|
QMessageBox.information(self, "Success", message)
|
|
else:
|
|
QMessageBox.critical(self, "Error", message)
|
|
|
|
|
|
def main():
|
|
app = QApplication(sys.argv)
|
|
window = RestoreUI()
|
|
window.show()
|
|
sys.exit(app.exec())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|