import streamlit as st import os import zipfile import tempfile import subprocess import psycopg2 from psycopg2 import sql from pathlib import Path from datetime import date import re st.set_page_config(page_title="DB Backup Importer", layout="wide") st.title("PostgreSQL Backup Importer") def _find_psql(): """Return psql path if found in PATH or common Windows install dirs.""" try: result = subprocess.run( ["psql", "--version"], capture_output=True, timeout=5 ) if result.returncode == 0: return "psql" except Exception: pass for base in [ r"C:\Program Files\PostgreSQL", r"C:\Program Files (x86)\PostgreSQL", ]: if os.path.isdir(base): for version in sorted(os.listdir(base), reverse=True): candidate = os.path.join(base, version, "bin", "psql.exe") if os.path.isfile(candidate): return candidate return "" def get_connection(host, port, user, password, dbname="postgres"): return psycopg2.connect( host=host, port=port, user=user, password=password, dbname=dbname ) def list_zip_files(path): try: files = [f for f in os.listdir(path) if f.lower().endswith(".zip")] return sorted(files, reverse=True) except PermissionError: st.error("Permission denied accessing the folder.") return [] except FileNotFoundError: st.error("Folder not found.") return [] _ZIP_DATE_RE = re.compile( r"bdd_sophal(\d{4})(\d{2})(\d{2})\d{4}\.zip$", re.IGNORECASE ) def parse_zip_date(filename): """Return a date for filenames matching bdd_sophalYYYYMMDDHHMM.zip, else None.""" m = _ZIP_DATE_RE.search(filename) if m: try: return date(int(m.group(1)), int(m.group(2)), int(m.group(3))) except ValueError: return None return None def peek_zip(zip_path): """Return list of .sql filenames inside a ZIP.""" with zipfile.ZipFile(zip_path, "r") as z: return [n for n in z.namelist() if n.lower().endswith(".sql")] def create_database(host, port, user, password, db_name): conn = get_connection(host, port, user, password) conn.autocommit = True cur = conn.cursor() cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(db_name))) cur.close() conn.close() def import_sql(psql_exe, host, port, user, password, db_name, sql_file_path): env = os.environ.copy() env["PGPASSWORD"] = password cmd = [ psql_exe, "-h", host, "-p", str(port), "-U", user, "-d", db_name, "-f", sql_file_path, "--echo-errors", ] result = subprocess.run( cmd, capture_output=True, text=True, env=env, timeout=600, ) return result with st.sidebar: st.header("Configuration") folder_path = st.text_input( "Shared Folder Path", placeholder=r"\\server\backups or Z:\backups", help="Path to the folder containing ZIP backup files", ) st.subheader("PostgreSQL Connection") pg_host = st.text_input("Host", value="localhost") pg_port = st.number_input("Port", value=5432, min_value=1, max_value=65535) pg_user = st.text_input("User", value="postgres") pg_password = st.text_input("Password", type="password") st.subheader("Date Filter") use_date_filter = st.checkbox("Filter backups by date") date_from = None date_to = None if use_date_filter: date_from = st.date_input("From", value=None, key="date_from") date_to = st.date_input("To", value=None, key="date_to") st.subheader("psql Executable") psql_path = st.text_input( "Path to psql", value=_find_psql(), help="Full path to psql.exe if not auto-detected", ) if not folder_path: st.info("Enter the shared folder path in the sidebar to get started.") st.stop() if not os.path.exists(folder_path): st.error(f"Path does not exist: `{folder_path}`") st.stop() zip_files = list_zip_files(folder_path) if not zip_files: st.warning("No ZIP files found in the selected folder.") st.stop() if use_date_filter and (date_from or date_to): def _in_range(fname): d = parse_zip_date(fname) if d is None: return True if date_from and d < date_from: return False if date_to and d > date_to: return False return True zip_files = [f for f in zip_files if _in_range(f)] if not zip_files: st.warning("No ZIP files match the selected date range.") st.stop() st.subheader("Select a Backup") selected_zip = st.selectbox("Available backups", zip_files) zip_full_path = os.path.join(folder_path, selected_zip) sql_files = peek_zip(zip_full_path) if not sql_files: st.error("No .sql file found inside the selected ZIP.") st.stop() sql_filename = sql_files[0] st.info(f"SQL file inside ZIP: **{sql_filename}**") default_db = Path(sql_filename).stem.lower().replace(" ", "_") db_name = st.text_input("New Database Name", value=default_db) db_name_clean = db_name.strip() if not db_name_clean: st.warning("Please enter a database name.") st.stop() if st.button("Import to PostgreSQL", type="primary"): if not pg_password: st.error("PostgreSQL password is required.") st.stop() if not psql_path: st.error( "psql executable not found. Install PostgreSQL or set the path in the sidebar." ) st.stop() status = st.empty() progress_bar = st.progress(0) with tempfile.TemporaryDirectory() as tmpdir: status.info("Step 1/3 — Extracting SQL file from ZIP... (33%)") progress_bar.progress(33) with zipfile.ZipFile(zip_full_path, "r") as z: z.extract(sql_filename, tmpdir) extracted_sql = os.path.join(tmpdir, sql_filename) status.info(f"Step 2/3 — Creating database **{db_name_clean}**... (67%)") progress_bar.progress(67) try: create_database(pg_host, int(pg_port), pg_user, pg_password, db_name_clean) except psycopg2.errors.DuplicateDatabase: progress_bar.empty() st.error( f"Database **{db_name_clean}** already exists. " "Choose a different name or drop it manually first." ) st.stop() except psycopg2.OperationalError as e: progress_bar.empty() st.error(f"Could not connect to PostgreSQL: {e}") st.stop() except Exception as e: progress_bar.empty() st.error(f"Failed to create database: {e}") st.stop() status.info(f"Step 3/3 — Importing SQL into **{db_name_clean}** — this may take a while... (100%)") progress_bar.progress(100) result = import_sql( psql_path, pg_host, int(pg_port), pg_user, pg_password, db_name_clean, extracted_sql, ) status.empty() progress_bar.empty() if result.returncode == 0: st.success(f"Database **{db_name_clean}** created and imported successfully.") else: st.error("Import finished with errors.") if result.stderr: with st.expander("psql error output", expanded=True): st.code(result.stderr, language="text") if result.stdout: with st.expander("psql output"): st.code(result.stdout, language="text")