79 lines
2.4 KiB
Python
Executable File
79 lines
2.4 KiB
Python
Executable File
#/bin/python3
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import inquirer
|
|
import os
|
|
import subprocess
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.live import Live
|
|
|
|
# URLs des pages à analyser
|
|
urls = [
|
|
'https://cloud.o-forge.io/explore/repos?page=1',
|
|
'https://cloud.o-forge.io/explore/repos?page=2'
|
|
]
|
|
|
|
def get_all_repo(urls):
|
|
repositories = []
|
|
|
|
for url in urls:
|
|
response = requests.get(url)
|
|
response.raise_for_status() # Vérifie si la requête a réussi
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
titles = soup.find_all(class_='flex-item-title')
|
|
|
|
for title in titles:
|
|
repo_name = title.get_text(strip=True)
|
|
if repo_name.startswith('core/'):
|
|
repositories.append(repo_name.split("core/")[1])
|
|
return repositories
|
|
|
|
def git_clone_repo(repo: str, dir: str, status: dict):
|
|
status[repo] = "⏳ Cloning..."
|
|
|
|
try:
|
|
if os.path.exists(f"{dir}/{repo}") :
|
|
subprocess.run(["git", "-C", f"{dir}/{repo}","pull"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
else:
|
|
base_url = f"https://cloud.o-forge.io/core/{repo}.git"
|
|
subprocess.run(["git", "clone", base_url, f"{dir}/{repo}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
status[repo] = "✅ Done"
|
|
except Exception as e:
|
|
status[repo] = f"❌ Failed: {e}"
|
|
|
|
def display_status(status):
|
|
table = Table(title="Repository Cloning Status")
|
|
table.add_column("Repository", justify="left")
|
|
table.add_column("Status", justify="right")
|
|
|
|
for repo, state in status.items():
|
|
table.add_row(repo, state)
|
|
|
|
return table
|
|
|
|
repositories = get_all_repo(urls)
|
|
cwd = os.getcwd()
|
|
|
|
questions = [
|
|
inquirer.Checkbox('repo_choice',
|
|
message=f"Which Open Cloud repo do you want to download ? (Will be download in {cwd})?",
|
|
choices=repositories,
|
|
),
|
|
]
|
|
|
|
selected_repo = inquirer.prompt(questions)
|
|
status = {repo: "Waiting" for repo in selected_repo["repo_choice"]}
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
futures = {executor.submit(git_clone_repo, repo, cwd, status): repo for repo in selected_repo["repo_choice"]}
|
|
|
|
with Live(display_status(status), refresh_per_second=2) as live:
|
|
for future in as_completed(futures):
|
|
live.update(display_status(status))
|