oc-deploy/download_oc.py

79 lines
2.4 KiB
Python
Executable File

#/bin/python3
import requests
from bs4 import BeautifulSoup
import inquirer
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.console import Console
from rich.table import Table
from rich.live import Live
# URLs des pages à analyser
urls = [
'https://cloud.o-forge.io/explore/repos?page=1',
'https://cloud.o-forge.io/explore/repos?page=2'
]
def get_all_repo(urls):
repositories = []
for url in urls:
response = requests.get(url)
response.raise_for_status() # Vérifie si la requête a réussi
soup = BeautifulSoup(response.text, 'html.parser')
titles = soup.find_all(class_='flex-item-title')
for title in titles:
repo_name = title.get_text(strip=True)
if repo_name.startswith('core/'):
repositories.append(repo_name.split("core/")[1])
return repositories
def git_clone_repo(repo: str, dir: str, status: dict):
status[repo] = "⏳ Cloning..."
try:
if os.path.exists(f"{dir}/{repo}") :
subprocess.run(["git", "-C", f"{dir}/{repo}","pull"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
base_url = f"https://cloud.o-forge.io/core/{repo}.git"
subprocess.run(["git", "clone", base_url, f"{dir}/{repo}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
status[repo] = "✅ Done"
except Exception as e:
status[repo] = f"❌ Failed: {e}"
def display_status(status):
table = Table(title="Repository Cloning Status")
table.add_column("Repository", justify="left")
table.add_column("Status", justify="right")
for repo, state in status.items():
table.add_row(repo, state)
return table
repositories = get_all_repo(urls)
cwd = os.getcwd()
questions = [
inquirer.Checkbox('repo_choice',
message=f"Which Open Cloud repo do you want to download ? (Will be download in {cwd})?",
choices=repositories,
),
]
selected_repo = inquirer.prompt(questions)
status = {repo: "Waiting" for repo in selected_repo["repo_choice"]}
with ThreadPoolExecutor() as executor:
futures = {executor.submit(git_clone_repo, repo, cwd, status): repo for repo in selected_repo["repo_choice"]}
with Live(display_status(status), refresh_per_second=2) as live:
for future in as_completed(futures):
live.update(display_status(status))