Les choses sérieuses commencent

This commit is contained in:
L0m1g 2025-04-29 17:15:19 +02:00
parent 7a9fe18463
commit c63f62721b
41 changed files with 1270 additions and 0 deletions

0
erminig/__init__.py Normal file
View file

0
erminig/cli/__init__.py Normal file
View file

54
erminig/cli/evezh.py Normal file
View file

@ -0,0 +1,54 @@
import argparse
import json
from erminig.controllers.evezh import check
from erminig.models.db import init_db
def main():
parser = argparse.ArgumentParser(description="Evezh Veille logicielle artisanale")
subparsers = parser.add_subparsers(dest="command")
init_parser = subparsers.add_parser("init", help="Initialiser la base de données")
check_parser = subparsers.add_parser("check")
check_parser.add_argument("--config")
check_parser.add_argument("--output")
check_parser.add_argument("--stdout", action="store_true")
sync_parser = subparsers.add_parser("sync")
sync_parser.add_argument("--config", required=True)
sync_parser.add_argument("--db", required=True)
args = parser.parse_args()
if args.command == "init":
init_db()
if args.command == "check":
state = check.load_state(args.output) if args.output else {}
results = check.check_versions(args.config, state)
new_state = {}
updated = []
for r in results:
name, version, url = r["name"], r["version"], r["url"]
if state.get(name, {}).get("version") != version:
updated.append(r)
new_state[name] = {"version": version, "url": url}
if args.output:
check.save_state(args.output, new_state)
if updated:
print(f"\n{len(updated)} mise(s) à jour détectée(s) :")
for r in updated:
print(f" - {r['name']}{r['version']}")
else:
print("Aucun changement détecté.")
if args.stdout:
print("\n--- Résultat complet ---")
print(json.dumps(results, indent=2))
elif args.command == "sync":
check.sync_db(args.config)

37
erminig/cli/govel.py Normal file
View file

@ -0,0 +1,37 @@
import argparse
from erminig.config import Config
from erminig.controllers.govel.pakva import Pakva
from erminig.controllers.govel.build import run_build_function
def main():
parser = argparse.ArgumentParser(description="Govel Build artisanal Erminig")
subparsers = parser.add_subparsers(dest="command")
build_parser = subparsers.add_parser("build")
build_parser.add_argument("--name", help="Nom du paquet à builder")
args = parser.parse_args()
if args.command == "build":
if args.name:
pakva = Pakva(name=args.name, version=None, archive=None)
pakva.read()
else:
if not Config.PAKVA_DIR.exists():
print("[GOVEL] Erreur : Aucun Pakva trouvé ici.")
return
pakva = Pakva(name="local", version=None, archive=None)
pakva.path = Config.PAKVA_DIR
pakva.read()
build_success = run_build_function(pakva.path)
if build_success:
print(f"[GOVEL] Build réussi pour {pakva.name}")
else:
print(f"[GOVEL] Build échoué pour {pakva.name}")
if __name__ == "__main__":
main()

56
erminig/cli/init.py Normal file
View file

@ -0,0 +1,56 @@
import os
import subprocess
import sys
from pathlib import Path
from erminig.system.security import check_root, check_user_exists
from erminig.config import Config # Voilà la différence clé !
PAK_USER = Config.PAK_USER
def create_user_pak():
"""Crée l'utilisateur pak si nécessaire."""
if check_user_exists(PAK_USER):
print(f"[INIT] Utilisateur '{PAK_USER}' existe déjà.")
return
print(f"[INIT] Création de l'utilisateur '{PAK_USER}'...")
subprocess.run(
[
"useradd",
"-r",
"-d",
str(Config.LIB_DIR),
"-s",
"/usr/sbin/nologin",
PAK_USER,
],
check=True,
)
print(f"[INIT] Utilisateur '{PAK_USER}' créé.")
def setup_directories():
"""Crée les dossiers nécessaires et assigne les permissions."""
for directory in [Config.LIB_DIR, Config.CACHE_DIR]:
if not directory.exists():
print(f"[INIT] Création du dossier {directory}...")
directory.mkdir(parents=True, exist_ok=True)
print(f"[INIT] Attribution de {directory} à '{PAK_USER}'...")
subprocess.run(
["chown", "-R", f"{PAK_USER}:{PAK_USER}", str(directory)], check=True
)
def main():
check_root()
create_user_pak()
setup_directories()
print("[INIT] Environnement Erminig initialisé avec succès.")
if __name__ == "__main__":
main()

18
erminig/config.py Normal file
View file

@ -0,0 +1,18 @@
import os
from pathlib import Path
class Config:
LIB_DIR = Path("/var/lib/erminig")
CACHE_DIR = Path("/var/cache/erminig")
BASE_DIR = Path("/opt/erminig")
DB_PATH = LIB_DIR / "erminig.db"
PAKVA_DIR = LIB_DIR / "pakva"
GOVEL_DIR = LIB_DIR / "govel"
REPO_DIR = LIB_DIR / "keo"
PAK_USER = "pak"
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
RETRY_MAX_ATTEMPTS = 2 # Pour toutes les opérations réseau
RETRY_DELAY_SECONDS = 2

View file

View file

View file

@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
class UpstreamSource(ABC):
MAX_RETRIES = 2
RETRY_DELAY = 2 # secondes
def __init__(self, name, config):
self.name = name
self.config = config
@abstractmethod
def get_latest(self):
pass

View file

@ -0,0 +1,122 @@
import json
from pathlib import Path
import yaml
from erminig.controllers.evezh.parsers.github import GitHubSource
from erminig.controllers.evezh.parsers.http import HttpSource
from erminig.controllers.evezh.parsers.sourceforge import SourceForgeRSS
from erminig.handlers.versions import handle_new_version
from erminig.models.db import ErminigDB
from erminig.models import upstreams, versions
def load_state(path):
if Path(path).exists():
return json.loads(Path(path).read_text())
return {}
def save_state(path, state):
Path(path).write_text(json.dumps(state, indent=2))
def sync_db(config_path):
config = get_config(config_path)
print(config)
with ErminigDB() as db:
for proj in config:
name = proj["name"]
type_ = "http"
if "github" in proj:
type_ = "github"
url = proj["github"]
elif "sourceforge" in proj:
type_ = "sourceforge"
url = proj["sourceforge"]
else:
url = proj.get("url", "")
pattern = proj.get("pattern", "")
file = proj.get("file", None)
upstreams.upsert_upstream(db, name, type_, url, pattern, file)
print("Synchronisation terminée.")
def make_source(name, config):
if "github" in config:
return GitHubSource(name, config)
elif "sourceforge" in config:
return SourceForgeRSS(name, config)
else:
return HttpSource(name, config)
def apply_template(name, template_name, templates):
tpl = templates.get(template_name)
if not tpl:
raise ValueError(f"Template '{template_name}' non trouvé.")
return {
"url": tpl["url"].replace("@NAME@", name),
"pattern": tpl["pattern"].replace("@NAME@", name),
}
def get_config(path):
with open(path) as f:
full = yaml.safe_load(f)
templates = full.get("templates", {})
projects = full.get("projects", [])
resolved = []
for proj in projects:
name = proj["name"]
new_proj = proj.copy()
if "template" in proj:
tpl = apply_template(name, proj["template"], templates)
new_proj["url"] = tpl["url"]
new_proj["pattern"] = tpl["pattern"]
resolved.append(new_proj)
return resolved
def check_versions(config_path, state=None):
results = []
with ErminigDB() as db:
for row in upstreams.get_all_upstreams(db):
proj = dict(row)
if proj["type"] == "github":
proj["github"] = proj["url"]
elif proj["type"] == "sourceforge":
proj["sourceforge"] = proj["url"]
else:
proj["url"] = proj["url"]
upstream_id = proj["id"]
name = proj["name"]
source = make_source(name, proj)
latest = source.get_latest()
if latest:
results.append(latest)
version = latest["version"]
url = latest["url"]
handle_new_version(db, upstream_id, name, version, url)
elif state and name in state:
print(f"[{name}] Serveur HS. On garde lancienne version.")
results.append(
{
"name": name,
"version": state[name]["version"],
"url": state[name]["url"],
}
)
else:
print(f"[{name}] Aucune version détectée.")
return results

View file

@ -0,0 +1,76 @@
import re
import requests
from erminig.config import Config
from erminig.controllers.evezh.abstract import UpstreamSource
from erminig.system.retry import retry_on_failure
class GitHubSource(UpstreamSource):
@retry_on_failure()
def get_latest(self):
repo = self.config["github"]
file_template = self.config.get("file")
latest = self._get_latest_release(repo)
if not latest:
latest = self._get_latest_tag(repo)
if not latest:
print(f"[{self.name}] Aucune version détectée.")
return None
version = self.normalize_version(latest["tag"])
url = latest.get("url")
if not url and file_template:
filename = file_template.replace("${version}", version)
url = f"https://github.com/{repo}/releases/download/{latest['tag']}/{filename}"
print(f"[{self.name}] Fallback URL : {url}")
print(url)
return {"name": self.name, "version": version, "url": url or ""}
def _github_headers(self):
headers = {}
if Config.GITHUB_TOKEN:
headers["Authorization"] = f"token {Config.GITHUB_TOKEN}"
return headers
def _get_latest_release(self, repo):
r = requests.get(
f"https://api.github.com/repos/{repo}/releases",
headers=self._github_headers(),
)
r.raise_for_status()
data = r.json()
if not data:
return None
release = data[0]
for asset in release.get("assets", []):
if asset["browser_download_url"].endswith(".tar.gz"):
return {
"tag": release["tag_name"],
"url": asset["browser_download_url"],
}
return {"tag": release["tag_name"]}
def _get_latest_tag(self, repo):
r = requests.get(
f"https://api.github.com/repos/{repo}/tags", headers=self._github_headers()
)
r.raise_for_status()
tags = r.json()
if not tags:
return None
url = f"https://github.com/{repo}/archive/refs/tags/{tags[0]["name"]}.tar.gz"
return {"tag": tags[0]["name"], "url": url}
def normalize_version(self, tag):
# Exemples : v2.7.1 → 2.7.1, R_2_7_1 → 2.7.1, expat-2.7.1 → 2.7.1
tag = tag.strip()
if tag.lower().startswith(("v", "r_", "r")):
tag = re.sub(r"^[vVrR_]+", "", tag)
tag = tag.replace("_", ".")
match = re.search(r"(\d+\.\d+(?:\.\d+)?)", tag)
return match.group(1) if match else tag

View file

@ -0,0 +1,45 @@
import re
import requests
from erminig.controllers.evezh.abstract import UpstreamSource
from erminig.system.retry import retry_on_failure
class HttpSource(UpstreamSource):
@retry_on_failure()
def get_latest(self):
base_url = self.config["url"]
pattern = self.config["pattern"]
file_pattern = self.config.get("file")
response = requests.get(
base_url, timeout=10
) # timeout pour éviter de bloquer éternellement
response.raise_for_status()
html = response.text
matches = re.findall(pattern, html)
if not matches:
print(f"[{self.name}] Aucun match avec pattern : {pattern}")
return None
latest_dir = sorted(set(matches), key=self.version_key)[-1]
version = self.extract_version(latest_dir)
if file_pattern:
filename = file_pattern.replace("${version}", version)
url = f"{base_url}{latest_dir}{filename}"
else:
url = f"{base_url}{latest_dir}"
print(url)
return {"name": self.name, "version": version, "url": url}
def extract_version(self, path):
match = re.search(r"([0-9]+\.[0-9]+(?:\.[0-9]+)?)", path)
return match.group(1) if match else path
def version_key(self, ver):
nums = re.findall(r"\d+", ver)
return list(map(int, nums))

View file

@ -0,0 +1,33 @@
import re
import requests
import xml.etree.ElementTree as ET
from erminig.controllers.evezh.abstract import UpstreamSource
from erminig.system.retry import retry_on_failure
class SourceForgeRSS(UpstreamSource):
@retry_on_failure()
def get_latest(self):
rss_url = self.config["sourceforge"]
r = requests.get(rss_url, timeout=10)
r.raise_for_status()
root = ET.fromstring(r.text)
items = root.findall(".//item")
versions = []
for item in items:
title = item.findtext("title") or ""
match = re.search(r"([0-9]+\.[0-9]+(?:\.[0-9]+)?)", title)
if match:
version = match.group(1)
link = item.findtext("link")
versions.append((version, link))
if not versions:
print(f"[{self.name}] Aucune version trouvée via RSS.")
return None
latest = sorted(versions, key=lambda x: list(map(int, x[0].split("."))))[-1]
print(latest[1])
return {"name": self.name, "version": latest[0], "url": latest[1]}

View file

View file

@ -0,0 +1,33 @@
import subprocess
from erminig.system.security import check_root, check_user_exists, run_as_user
check_root
check_user_exists("pak")
@run_as_user("pak")
def run_build_function(pakva_path):
"""
Exécute la fonction build() du fichier Pakva donné.
"""
try:
result = subprocess.run(
f"""
set -e
source "{pakva_path}"
build
""",
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
executable="/bin/bash",
text=True,
)
print(f"[BUILD] Succès : {pakva_path.name}")
print(result.stdout)
return True
except subprocess.CalledProcessError as e:
print(f"[BUILD] Échec : {pakva_path.name}")
print(e.stderr)
return False

View file

@ -0,0 +1,103 @@
from pathlib import Path
from erminig.config import Config
from erminig.system.security import run_as_user
class Pakva:
def __init__(self, name, version, archive):
self.name = name
self.version = version
self.archive = archive
self.path = Config.GOVEL_DIR / name[0] / name / "Pakva"
@run_as_user("pak")
def save(self):
self.path.parent.mkdir(parents=True, exist_ok=True)
with open(self.path, "w") as f:
f.write(
f"""\
name="{self.name}"
version="{self.version}"
revision=1
source=("{self.archive}")
build() {{
echo "Build function not implemented"
}}
pak() {{
echo "Packaging function not implemented"
}}
"""
)
@run_as_user("pak")
def update_version(self, version, archive, reset_revision=False):
if not self.path.exists():
raise FileNotFoundError(f"Aucun fichier Pakva pour {self.name}")
with open(self.path, "r") as f:
lines = f.readlines()
new_lines = []
updated_version = False
updated_source = False
for line in lines:
if line.startswith("version=") and not updated_version:
new_lines.append(f'version="{version}"\n')
updated_version = True
elif line.startswith("source=") and not updated_source:
new_lines.append(f'source=("{archive}")\n')
updated_source = True
elif reset_revision and line.startswith("revision="):
new_lines.append("revision=1\n")
else:
new_lines.append(line)
# Ajout si jamais les champs n'existaient pas
if not updated_version:
new_lines.insert(1, f'version="{version}"\n')
if not updated_source:
new_lines.append(f'source=("{archive}")\n')
if reset_revision and not any(line.startswith("revision=") for line in lines):
new_lines.insert(2, "revision=1\n")
with open(self.path, "w") as f:
f.writelines(new_lines)
self.version = version
self.archive = archive
print(f"[Pakva] Version mise à jour pour {self.name} -> {version}")
@classmethod
def read(cls, path):
"""
Lit un fichier Pakva et retourne un objet Pakva.
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Fichier Pakva introuvable : {path}")
with path.open() as f:
lines = f.readlines()
name = None
version = None
archive = None
for line in lines:
if line.startswith("name="):
name = line.split("=")[1].strip().strip('"')
elif line.startswith("version="):
version = line.split("=")[1].strip().strip('"')
elif line.startswith("source=("):
archive = line.split("(")[1].split(")")[0].strip().strip('"')
obj = cls(name, version, archive)
obj.path = path
return obj
def __repr__(self):
return f"<Pakva {self.name}-{self.version}>"

View file

View file

@ -0,0 +1,23 @@
from erminig.models import versions
from erminig.controllers.govel.pakva import Pakva
def handle_new_version(db, upstream_id, name, version, url):
"""
Gère l'arrivée d'une nouvelle version :
- Insère en base si absente
- Crée ou met à jour le fichier Pakva correspondant
"""
inserted = versions.insert_version_if_new(db, upstream_id, version, url)
if not inserted:
return False
pakva = Pakva(name, version, url)
if pakva.path.exists():
pakva.update_version(version, url, reset_revision=True)
print(f"[PAKVA] Mis à jour pour {name}")
else:
pakva.save()
print(f"[PAKVA] Créé pour {name}")
return True

View file

26
erminig/models/db.py Normal file
View file

@ -0,0 +1,26 @@
import sqlite3
from erminig.config import Config
def init_db():
if Config.DB_PATH.exists():
return
conn = sqlite3.connect(Config.DB_PATH)
with open(Config.BASE_DIR / "schema.sql", "r") as f:
conn.executescript(f.read())
conn.commit()
conn.close()
print("Base erminig.db initialisée avec succès.")
class ErminigDB:
def __enter__(self):
self.conn = sqlite3.connect(Config.DB_PATH)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.commit()
self.conn.close()

View file

@ -0,0 +1,29 @@
from erminig.models.db import ErminigDB
def upsert_upstream(db: ErminigDB, name, type_, url, pattern, file):
db.cursor.execute(
"""
INSERT INTO upstreams (name, type, url, pattern, file, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(name) DO UPDATE SET
type=excluded.type,
url=excluded.url,
pattern=excluded.pattern,
file=excluded.file
RETURNING id
""",
(name, type_, url, pattern, file),
)
row = db.cursor.fetchone()
return row[0] if row else None
def get_all_upstreams(db: ErminigDB):
db.cursor.execute(
"""
SELECT id, name, type, url, pattern, file FROM upstreams
"""
)
rows = db.cursor.fetchall()
return rows

View file

@ -0,0 +1,25 @@
from erminig.models.db import ErminigDB
def insert_version_if_new(db, upstream_id, version, url):
db.cursor.execute(
"""
SELECT id FROM versions
WHERE upstream_id = ? AND version = ?
""",
(upstream_id, version),
)
if db.cursor.fetchone():
return False # déjà en base
db.cursor.execute(
"""
INSERT INTO versions (upstream_id, version, url)
VALUES (?, ?, ?)
""",
(upstream_id, version, url),
)
print(f"[DB] Nouvelle version insérée pour {upstream_id} : {version}")
return True

View file

34
erminig/system/retry.py Normal file
View file

@ -0,0 +1,34 @@
import time
import requests
from erminig.config import Config
def retry_on_failure():
def decorator(func):
def wrapper(*args, **kwargs):
attempt = 1
while attempt <= Config.RETRY_MAX_ATTEMPTS + 1:
try:
return func(*args, **kwargs)
except requests.exceptions.RequestException as e:
name = getattr(args[0], "name", "Unknown")
print(
f"[{name}] Erreur réseau tentative {attempt}: {e.__class__.__name__}"
)
if attempt <= Config.RETRY_MAX_ATTEMPTS:
print(
f"[{name}] Nouvelle tentative dans {Config.RETRY_DELAY_SECONDS}s..."
)
time.sleep(Config.RETRY_DELAY_SECONDS)
attempt += 1
else:
print(f"[{name}] Abandon après {attempt} tentatives.")
return None
except Exception as e:
name = getattr(args[0], "name", "Unknown")
print(f"[{name}] Erreur inconnue dans {func.__name__}: {e}")
return None
return wrapper
return decorator

View file

@ -0,0 +1,55 @@
import os
import pwd
import sys
import functools
def check_root():
"""Vérifie si on est root, sinon quitte."""
if os.geteuid() != 0:
print("[SECURITY] Ce programme doit être exécuté en tant que root.")
sys.exit(1)
def check_user_exists(username):
"""Vérifie si l'utilisateur spécifié existe."""
try:
pwd.getpwnam(username)
return True
except KeyError:
print(f"[SECURITY] Utilisateur '{username}' introuvable.")
return False
def run_as_user(username):
"""Décorateur : Fork et drop privileges pour exécuter une fonction sous un autre utilisateur."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
pid = os.fork()
if pid > 0:
# Parent
_, status = os.waitpid(pid, 0)
return os.WEXITSTATUS(status)
# Child
pw_record = pwd.getpwnam(username)
user_uid = pw_record.pw_uid
user_gid = pw_record.pw_gid
os.setgid(user_gid)
os.setuid(user_uid)
# Exécuter la fonction sous l'utilisateur demandé
result = func(*args, **kwargs)
sys.exit(0 if result is None else int(bool(result)))
except OSError as e:
print(f"[SECURITY] Fork échoué : {e}")
sys.exit(1)
return wrapper
return decorator