Files
ERP-DB-IMPORTER/index.py
Bachir souldi f3c85602c5 Initial commit
2026-04-16 16:20:16 +01:00

253 lines
7.5 KiB
Python

import streamlit as st
import os
import zipfile
import tempfile
import subprocess
import psycopg2
from psycopg2 import sql
from pathlib import Path
from datetime import date
import re
st.set_page_config(page_title="DB Backup Importer", layout="wide")
st.title("PostgreSQL Backup Importer")
def _find_psql():
"""Return psql path if found in PATH or common Windows install dirs."""
try:
result = subprocess.run(
["psql", "--version"], capture_output=True, timeout=5
)
if result.returncode == 0:
return "psql"
except Exception:
pass
for base in [
r"C:\Program Files\PostgreSQL",
r"C:\Program Files (x86)\PostgreSQL",
]:
if os.path.isdir(base):
for version in sorted(os.listdir(base), reverse=True):
candidate = os.path.join(base, version, "bin", "psql.exe")
if os.path.isfile(candidate):
return candidate
return ""
def get_connection(host, port, user, password, dbname="postgres"):
return psycopg2.connect(
host=host, port=port, user=user, password=password, dbname=dbname
)
def list_zip_files(path):
try:
files = [f for f in os.listdir(path) if f.lower().endswith(".zip")]
return sorted(files, reverse=True)
except PermissionError:
st.error("Permission denied accessing the folder.")
return []
except FileNotFoundError:
st.error("Folder not found.")
return []
_ZIP_DATE_RE = re.compile(
r"bdd_sophal(\d{4})(\d{2})(\d{2})\d{4}\.zip$", re.IGNORECASE
)
def parse_zip_date(filename):
"""Return a date for filenames matching bdd_sophalYYYYMMDDHHMM.zip, else None."""
m = _ZIP_DATE_RE.search(filename)
if m:
try:
return date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
return None
return None
def peek_zip(zip_path):
"""Return list of .sql filenames inside a ZIP."""
with zipfile.ZipFile(zip_path, "r") as z:
return [n for n in z.namelist() if n.lower().endswith(".sql")]
def create_database(host, port, user, password, db_name):
conn = get_connection(host, port, user, password)
conn.autocommit = True
cur = conn.cursor()
cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(db_name)))
cur.close()
conn.close()
def import_sql(psql_exe, host, port, user, password, db_name, sql_file_path):
env = os.environ.copy()
env["PGPASSWORD"] = password
cmd = [
psql_exe,
"-h", host,
"-p", str(port),
"-U", user,
"-d", db_name,
"-f", sql_file_path,
"--echo-errors",
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
env=env,
timeout=600,
)
return result
with st.sidebar:
st.header("Configuration")
folder_path = st.text_input(
"Shared Folder Path",
placeholder=r"\\server\backups or Z:\backups",
help="Path to the folder containing ZIP backup files",
)
st.subheader("PostgreSQL Connection")
pg_host = st.text_input("Host", value="localhost")
pg_port = st.number_input("Port", value=5432, min_value=1, max_value=65535)
pg_user = st.text_input("User", value="postgres")
pg_password = st.text_input("Password", type="password")
st.subheader("Date Filter")
use_date_filter = st.checkbox("Filter backups by date")
date_from = None
date_to = None
if use_date_filter:
date_from = st.date_input("From", value=None, key="date_from")
date_to = st.date_input("To", value=None, key="date_to")
st.subheader("psql Executable")
psql_path = st.text_input(
"Path to psql",
value=_find_psql(),
help="Full path to psql.exe if not auto-detected",
)
if not folder_path:
st.info("Enter the shared folder path in the sidebar to get started.")
st.stop()
if not os.path.exists(folder_path):
st.error(f"Path does not exist: `{folder_path}`")
st.stop()
zip_files = list_zip_files(folder_path)
if not zip_files:
st.warning("No ZIP files found in the selected folder.")
st.stop()
if use_date_filter and (date_from or date_to):
def _in_range(fname):
d = parse_zip_date(fname)
if d is None:
return True
if date_from and d < date_from:
return False
if date_to and d > date_to:
return False
return True
zip_files = [f for f in zip_files if _in_range(f)]
if not zip_files:
st.warning("No ZIP files match the selected date range.")
st.stop()
st.subheader("Select a Backup")
selected_zip = st.selectbox("Available backups", zip_files)
zip_full_path = os.path.join(folder_path, selected_zip)
sql_files = peek_zip(zip_full_path)
if not sql_files:
st.error("No .sql file found inside the selected ZIP.")
st.stop()
sql_filename = sql_files[0]
st.info(f"SQL file inside ZIP: **{sql_filename}**")
default_db = Path(sql_filename).stem.lower().replace(" ", "_")
db_name = st.text_input("New Database Name", value=default_db)
db_name_clean = db_name.strip()
if not db_name_clean:
st.warning("Please enter a database name.")
st.stop()
if st.button("Import to PostgreSQL", type="primary"):
if not pg_password:
st.error("PostgreSQL password is required.")
st.stop()
if not psql_path:
st.error(
"psql executable not found. Install PostgreSQL or set the path in the sidebar."
)
st.stop()
status = st.empty()
progress_bar = st.progress(0)
with tempfile.TemporaryDirectory() as tmpdir:
status.info("Step 1/3 — Extracting SQL file from ZIP... (33%)")
progress_bar.progress(33)
with zipfile.ZipFile(zip_full_path, "r") as z:
z.extract(sql_filename, tmpdir)
extracted_sql = os.path.join(tmpdir, sql_filename)
status.info(f"Step 2/3 — Creating database **{db_name_clean}**... (67%)")
progress_bar.progress(67)
try:
create_database(pg_host, int(pg_port), pg_user, pg_password, db_name_clean)
except psycopg2.errors.DuplicateDatabase:
progress_bar.empty()
st.error(
f"Database **{db_name_clean}** already exists. "
"Choose a different name or drop it manually first."
)
st.stop()
except psycopg2.OperationalError as e:
progress_bar.empty()
st.error(f"Could not connect to PostgreSQL: {e}")
st.stop()
except Exception as e:
progress_bar.empty()
st.error(f"Failed to create database: {e}")
st.stop()
status.info(f"Step 3/3 — Importing SQL into **{db_name_clean}** — this may take a while... (100%)")
progress_bar.progress(100)
result = import_sql(
psql_path, pg_host, int(pg_port), pg_user, pg_password,
db_name_clean, extracted_sql,
)
status.empty()
progress_bar.empty()
if result.returncode == 0:
st.success(f"Database **{db_name_clean}** created and imported successfully.")
else:
st.error("Import finished with errors.")
if result.stderr:
with st.expander("psql error output", expanded=True):
st.code(result.stderr, language="text")
if result.stdout:
with st.expander("psql output"):
st.code(result.stdout, language="text")