#/bin/python3 import requests from bs4 import BeautifulSoup import inquirer import os import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed from rich.console import Console from rich.table import Table from rich.live import Live # URLs des pages à analyser urls = [ 'https://cloud.o-forge.io/explore/repos?page=1', 'https://cloud.o-forge.io/explore/repos?page=2' ] def get_all_repo(urls): repositories = [] for url in urls: response = requests.get(url) response.raise_for_status() # Vérifie si la requête a réussi soup = BeautifulSoup(response.text, 'html.parser') titles = soup.find_all(class_='flex-item-title') for title in titles: repo_name = title.get_text(strip=True) if repo_name.startswith('core/'): repositories.append(repo_name.split("core/")[1]) return repositories def git_clone_repo(repo: str, dir: str, status: dict): status[repo] = "⏳ Cloning..." try: if os.path.exists(f"{dir}/{repo}") : subprocess.run(["git", "-C", f"{dir}/{repo}","pull"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) else: base_url = f"https://cloud.o-forge.io/core/{repo}.git" subprocess.run(["git", "clone", base_url, f"{dir}/{repo}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) status[repo] = "✅ Done" except Exception as e: status[repo] = f"❌ Failed: {e}" def display_status(status): table = Table(title="Repository Cloning Status") table.add_column("Repository", justify="left") table.add_column("Status", justify="right") for repo, state in status.items(): table.add_row(repo, state) return table repositories = get_all_repo(urls) cwd = os.getcwd() questions = [ inquirer.Checkbox('repo_choice', message=f"Which Open Cloud repo do you want to download ? (Will be download in {cwd})?", choices=repositories, ), ] selected_repo = inquirer.prompt(questions) status = {repo: "Waiting" for repo in selected_repo["repo_choice"]} with ThreadPoolExecutor() as executor: futures = {executor.submit(git_clone_repo, repo, cwd, status): repo for repo in selected_repo["repo_choice"]} with Live(display_status(status), refresh_per_second=2) as live: for future in as_completed(futures): live.update(display_status(status))