Initial commit

This commit is contained in:
Bachir souldi
2026-04-16 16:20:16 +01:00
commit f3c85602c5

252
index.py Normal file
View File

@@ -0,0 +1,252 @@
import streamlit as st
import os
import zipfile
import tempfile
import subprocess
import psycopg2
from psycopg2 import sql
from pathlib import Path
from datetime import date
import re
st.set_page_config(page_title="DB Backup Importer", layout="wide")
st.title("PostgreSQL Backup Importer")
def _find_psql():
"""Return psql path if found in PATH or common Windows install dirs."""
try:
result = subprocess.run(
["psql", "--version"], capture_output=True, timeout=5
)
if result.returncode == 0:
return "psql"
except Exception:
pass
for base in [
r"C:\Program Files\PostgreSQL",
r"C:\Program Files (x86)\PostgreSQL",
]:
if os.path.isdir(base):
for version in sorted(os.listdir(base), reverse=True):
candidate = os.path.join(base, version, "bin", "psql.exe")
if os.path.isfile(candidate):
return candidate
return ""
def get_connection(host, port, user, password, dbname="postgres"):
return psycopg2.connect(
host=host, port=port, user=user, password=password, dbname=dbname
)
def list_zip_files(path):
try:
files = [f for f in os.listdir(path) if f.lower().endswith(".zip")]
return sorted(files, reverse=True)
except PermissionError:
st.error("Permission denied accessing the folder.")
return []
except FileNotFoundError:
st.error("Folder not found.")
return []
_ZIP_DATE_RE = re.compile(
r"bdd_sophal(\d{4})(\d{2})(\d{2})\d{4}\.zip$", re.IGNORECASE
)
def parse_zip_date(filename):
"""Return a date for filenames matching bdd_sophalYYYYMMDDHHMM.zip, else None."""
m = _ZIP_DATE_RE.search(filename)
if m:
try:
return date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
return None
return None
def peek_zip(zip_path):
"""Return list of .sql filenames inside a ZIP."""
with zipfile.ZipFile(zip_path, "r") as z:
return [n for n in z.namelist() if n.lower().endswith(".sql")]
def create_database(host, port, user, password, db_name):
conn = get_connection(host, port, user, password)
conn.autocommit = True
cur = conn.cursor()
cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(db_name)))
cur.close()
conn.close()
def import_sql(psql_exe, host, port, user, password, db_name, sql_file_path):
env = os.environ.copy()
env["PGPASSWORD"] = password
cmd = [
psql_exe,
"-h", host,
"-p", str(port),
"-U", user,
"-d", db_name,
"-f", sql_file_path,
"--echo-errors",
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
env=env,
timeout=600,
)
return result
with st.sidebar:
st.header("Configuration")
folder_path = st.text_input(
"Shared Folder Path",
placeholder=r"\\server\backups or Z:\backups",
help="Path to the folder containing ZIP backup files",
)
st.subheader("PostgreSQL Connection")
pg_host = st.text_input("Host", value="localhost")
pg_port = st.number_input("Port", value=5432, min_value=1, max_value=65535)
pg_user = st.text_input("User", value="postgres")
pg_password = st.text_input("Password", type="password")
st.subheader("Date Filter")
use_date_filter = st.checkbox("Filter backups by date")
date_from = None
date_to = None
if use_date_filter:
date_from = st.date_input("From", value=None, key="date_from")
date_to = st.date_input("To", value=None, key="date_to")
st.subheader("psql Executable")
psql_path = st.text_input(
"Path to psql",
value=_find_psql(),
help="Full path to psql.exe if not auto-detected",
)
if not folder_path:
st.info("Enter the shared folder path in the sidebar to get started.")
st.stop()
if not os.path.exists(folder_path):
st.error(f"Path does not exist: `{folder_path}`")
st.stop()
zip_files = list_zip_files(folder_path)
if not zip_files:
st.warning("No ZIP files found in the selected folder.")
st.stop()
if use_date_filter and (date_from or date_to):
def _in_range(fname):
d = parse_zip_date(fname)
if d is None:
return True
if date_from and d < date_from:
return False
if date_to and d > date_to:
return False
return True
zip_files = [f for f in zip_files if _in_range(f)]
if not zip_files:
st.warning("No ZIP files match the selected date range.")
st.stop()
st.subheader("Select a Backup")
selected_zip = st.selectbox("Available backups", zip_files)
zip_full_path = os.path.join(folder_path, selected_zip)
sql_files = peek_zip(zip_full_path)
if not sql_files:
st.error("No .sql file found inside the selected ZIP.")
st.stop()
sql_filename = sql_files[0]
st.info(f"SQL file inside ZIP: **{sql_filename}**")
default_db = Path(sql_filename).stem.lower().replace(" ", "_")
db_name = st.text_input("New Database Name", value=default_db)
db_name_clean = db_name.strip()
if not db_name_clean:
st.warning("Please enter a database name.")
st.stop()
if st.button("Import to PostgreSQL", type="primary"):
if not pg_password:
st.error("PostgreSQL password is required.")
st.stop()
if not psql_path:
st.error(
"psql executable not found. Install PostgreSQL or set the path in the sidebar."
)
st.stop()
status = st.empty()
progress_bar = st.progress(0)
with tempfile.TemporaryDirectory() as tmpdir:
status.info("Step 1/3 — Extracting SQL file from ZIP... (33%)")
progress_bar.progress(33)
with zipfile.ZipFile(zip_full_path, "r") as z:
z.extract(sql_filename, tmpdir)
extracted_sql = os.path.join(tmpdir, sql_filename)
status.info(f"Step 2/3 — Creating database **{db_name_clean}**... (67%)")
progress_bar.progress(67)
try:
create_database(pg_host, int(pg_port), pg_user, pg_password, db_name_clean)
except psycopg2.errors.DuplicateDatabase:
progress_bar.empty()
st.error(
f"Database **{db_name_clean}** already exists. "
"Choose a different name or drop it manually first."
)
st.stop()
except psycopg2.OperationalError as e:
progress_bar.empty()
st.error(f"Could not connect to PostgreSQL: {e}")
st.stop()
except Exception as e:
progress_bar.empty()
st.error(f"Failed to create database: {e}")
st.stop()
status.info(f"Step 3/3 — Importing SQL into **{db_name_clean}** — this may take a while... (100%)")
progress_bar.progress(100)
result = import_sql(
psql_path, pg_host, int(pg_port), pg_user, pg_password,
db_name_clean, extracted_sql,
)
status.empty()
progress_bar.empty()
if result.returncode == 0:
st.success(f"Database **{db_name_clean}** created and imported successfully.")
else:
st.error("Import finished with errors.")
if result.stderr:
with st.expander("psql error output", expanded=True):
st.code(result.stderr, language="text")
if result.stdout:
with st.expander("psql output"):
st.code(result.stdout, language="text")