From f3c85602c513a4111704515b411e9a08640fbefb Mon Sep 17 00:00:00 2001 From: Bachir souldi Date: Thu, 16 Apr 2026 16:20:16 +0100 Subject: [PATCH] Initial commit --- index.py | 252 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 index.py diff --git a/index.py b/index.py new file mode 100644 index 0000000..8fee677 --- /dev/null +++ b/index.py @@ -0,0 +1,252 @@ +import streamlit as st +import os +import zipfile +import tempfile +import subprocess +import psycopg2 +from psycopg2 import sql +from pathlib import Path +from datetime import date +import re + +st.set_page_config(page_title="DB Backup Importer", layout="wide") +st.title("PostgreSQL Backup Importer") + +def _find_psql(): + """Return psql path if found in PATH or common Windows install dirs.""" + try: + result = subprocess.run( + ["psql", "--version"], capture_output=True, timeout=5 + ) + if result.returncode == 0: + return "psql" + except Exception: + pass + + for base in [ + r"C:\Program Files\PostgreSQL", + r"C:\Program Files (x86)\PostgreSQL", + ]: + if os.path.isdir(base): + for version in sorted(os.listdir(base), reverse=True): + candidate = os.path.join(base, version, "bin", "psql.exe") + if os.path.isfile(candidate): + return candidate + return "" + + +def get_connection(host, port, user, password, dbname="postgres"): + return psycopg2.connect( + host=host, port=port, user=user, password=password, dbname=dbname + ) + + +def list_zip_files(path): + try: + files = [f for f in os.listdir(path) if f.lower().endswith(".zip")] + return sorted(files, reverse=True) + except PermissionError: + st.error("Permission denied accessing the folder.") + return [] + except FileNotFoundError: + st.error("Folder not found.") + return [] + + +_ZIP_DATE_RE = re.compile( + r"bdd_sophal(\d{4})(\d{2})(\d{2})\d{4}\.zip$", re.IGNORECASE +) + + +def parse_zip_date(filename): + """Return a date for filenames matching bdd_sophalYYYYMMDDHHMM.zip, else None.""" + m = _ZIP_DATE_RE.search(filename) + if m: + try: + return date(int(m.group(1)), int(m.group(2)), int(m.group(3))) + except ValueError: + return None + return None + + +def peek_zip(zip_path): + """Return list of .sql filenames inside a ZIP.""" + with zipfile.ZipFile(zip_path, "r") as z: + return [n for n in z.namelist() if n.lower().endswith(".sql")] + + +def create_database(host, port, user, password, db_name): + conn = get_connection(host, port, user, password) + conn.autocommit = True + cur = conn.cursor() + cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(db_name))) + cur.close() + conn.close() + + +def import_sql(psql_exe, host, port, user, password, db_name, sql_file_path): + env = os.environ.copy() + env["PGPASSWORD"] = password + + cmd = [ + psql_exe, + "-h", host, + "-p", str(port), + "-U", user, + "-d", db_name, + "-f", sql_file_path, + "--echo-errors", + ] + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + env=env, + timeout=600, + ) + return result + +with st.sidebar: + st.header("Configuration") + + folder_path = st.text_input( + "Shared Folder Path", + placeholder=r"\\server\backups or Z:\backups", + help="Path to the folder containing ZIP backup files", + ) + + st.subheader("PostgreSQL Connection") + pg_host = st.text_input("Host", value="localhost") + pg_port = st.number_input("Port", value=5432, min_value=1, max_value=65535) + pg_user = st.text_input("User", value="postgres") + pg_password = st.text_input("Password", type="password") + + st.subheader("Date Filter") + use_date_filter = st.checkbox("Filter backups by date") + date_from = None + date_to = None + if use_date_filter: + date_from = st.date_input("From", value=None, key="date_from") + date_to = st.date_input("To", value=None, key="date_to") + + st.subheader("psql Executable") + psql_path = st.text_input( + "Path to psql", + value=_find_psql(), + help="Full path to psql.exe if not auto-detected", + ) + + +if not folder_path: + st.info("Enter the shared folder path in the sidebar to get started.") + st.stop() + +if not os.path.exists(folder_path): + st.error(f"Path does not exist: `{folder_path}`") + st.stop() + +zip_files = list_zip_files(folder_path) + +if not zip_files: + st.warning("No ZIP files found in the selected folder.") + st.stop() + +if use_date_filter and (date_from or date_to): + def _in_range(fname): + d = parse_zip_date(fname) + if d is None: + return True + if date_from and d < date_from: + return False + if date_to and d > date_to: + return False + return True + zip_files = [f for f in zip_files if _in_range(f)] + if not zip_files: + st.warning("No ZIP files match the selected date range.") + st.stop() + +st.subheader("Select a Backup") +selected_zip = st.selectbox("Available backups", zip_files) + +zip_full_path = os.path.join(folder_path, selected_zip) +sql_files = peek_zip(zip_full_path) + +if not sql_files: + st.error("No .sql file found inside the selected ZIP.") + st.stop() + +sql_filename = sql_files[0] +st.info(f"SQL file inside ZIP: **{sql_filename}**") + +default_db = Path(sql_filename).stem.lower().replace(" ", "_") +db_name = st.text_input("New Database Name", value=default_db) +db_name_clean = db_name.strip() + +if not db_name_clean: + st.warning("Please enter a database name.") + st.stop() + +if st.button("Import to PostgreSQL", type="primary"): + if not pg_password: + st.error("PostgreSQL password is required.") + st.stop() + + if not psql_path: + st.error( + "psql executable not found. Install PostgreSQL or set the path in the sidebar." + ) + st.stop() + + status = st.empty() + progress_bar = st.progress(0) + + with tempfile.TemporaryDirectory() as tmpdir: + + + status.info("Step 1/3 — Extracting SQL file from ZIP... (33%)") + progress_bar.progress(33) + with zipfile.ZipFile(zip_full_path, "r") as z: + z.extract(sql_filename, tmpdir) + extracted_sql = os.path.join(tmpdir, sql_filename) + + status.info(f"Step 2/3 — Creating database **{db_name_clean}**... (67%)") + progress_bar.progress(67) + try: + create_database(pg_host, int(pg_port), pg_user, pg_password, db_name_clean) + except psycopg2.errors.DuplicateDatabase: + progress_bar.empty() + st.error( + f"Database **{db_name_clean}** already exists. " + "Choose a different name or drop it manually first." + ) + st.stop() + except psycopg2.OperationalError as e: + progress_bar.empty() + st.error(f"Could not connect to PostgreSQL: {e}") + st.stop() + except Exception as e: + progress_bar.empty() + st.error(f"Failed to create database: {e}") + st.stop() + + status.info(f"Step 3/3 — Importing SQL into **{db_name_clean}** — this may take a while... (100%)") + progress_bar.progress(100) + result = import_sql( + psql_path, pg_host, int(pg_port), pg_user, pg_password, + db_name_clean, extracted_sql, + ) + + status.empty() + progress_bar.empty() + if result.returncode == 0: + st.success(f"Database **{db_name_clean}** created and imported successfully.") + else: + st.error("Import finished with errors.") + if result.stderr: + with st.expander("psql error output", expanded=True): + st.code(result.stderr, language="text") + if result.stdout: + with st.expander("psql output"): + st.code(result.stdout, language="text")