make ci catalog
This commit is contained in:
@@ -13,13 +13,22 @@ package infrastructure
|
||||
//
|
||||
// Environment variables (all optional):
|
||||
//
|
||||
// DOCKER_SCRAPER_ENABLED true | false (default: true)
|
||||
// DOCKER_SCRAPER_IMAGES comma-separated list of images to track.
|
||||
// Format: "name" for official library images,
|
||||
// "org/name" for user/org images.
|
||||
// Default: a curated set of popular official images.
|
||||
// DOCKER_SCRAPER_MAX_TAGS max tags to import per image (default: 10)
|
||||
// DOCKER_SCRAPER_INTERVAL_H refresh interval in hours (default: 24)
|
||||
// DOCKER_SCRAPER_ENABLED true | false (default: true)
|
||||
// DOCKER_SCRAPER_IMAGES comma-separated list of images to track.
|
||||
// Format: "name" for official library images,
|
||||
// "org/name" for user/org images.
|
||||
// Default: a curated set of popular official images.
|
||||
// DOCKER_SCRAPER_MAX_TAGS max tags to import per image (default: 10)
|
||||
// DOCKER_SCRAPER_INTERVAL_H refresh interval in hours (default: 24)
|
||||
//
|
||||
// Network endpoints (override to use a corporate proxy or private mirror):
|
||||
//
|
||||
// DOCKER_SCRAPER_HUB_URL Docker Hub API base URL (default: https://hub.docker.com)
|
||||
// DOCKER_SCRAPER_REGISTRY_URL Docker registry base URL (default: https://registry-1.docker.io)
|
||||
// DOCKER_SCRAPER_AUTH_URL Docker auth service URL (default: https://auth.docker.io)
|
||||
//
|
||||
// If none of the three endpoints are reachable at startup the scraper disables
|
||||
// itself automatically rather than spinning with repeated timeouts.
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@@ -56,6 +65,9 @@ type scraperConfig struct {
|
||||
Images []DockerImageSpec
|
||||
MaxTags int
|
||||
IntervalHours int
|
||||
HubURL string // base URL for the Docker Hub API
|
||||
RegistryURL string // base URL for the Docker registry API
|
||||
AuthURL string // base URL for the Docker auth service
|
||||
}
|
||||
|
||||
// defaultImages is the baseline catalog seeded when DOCKER_SCRAPER_IMAGES is not set.
|
||||
@@ -241,11 +253,21 @@ var defaultImages = []DockerImageSpec{
|
||||
// scraperConfigFromEnv reads scraper configuration from environment variables
|
||||
// and returns a populated scraperConfig with sensible defaults.
|
||||
func scraperConfigFromEnv() scraperConfig {
|
||||
envOrDefault := func(key, def string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return strings.TrimRight(v, "/")
|
||||
}
|
||||
return def
|
||||
}
|
||||
|
||||
cfg := scraperConfig{
|
||||
Enabled: true,
|
||||
MaxTags: 10,
|
||||
IntervalHours: 24,
|
||||
Images: defaultImages,
|
||||
HubURL: envOrDefault("DOCKER_SCRAPER_HUB_URL", "https://hub.docker.com"),
|
||||
RegistryURL: envOrDefault("DOCKER_SCRAPER_REGISTRY_URL", "https://registry-1.docker.io"),
|
||||
AuthURL: envOrDefault("DOCKER_SCRAPER_AUTH_URL", "https://auth.docker.io"),
|
||||
}
|
||||
|
||||
if v := os.Getenv("DOCKER_SCRAPER_ENABLED"); v == "false" {
|
||||
@@ -306,6 +328,150 @@ type hubTagsResponse struct {
|
||||
Results []hubTag `json:"results"`
|
||||
}
|
||||
|
||||
// registryClient is shared across all registry API calls with a sensible timeout.
|
||||
var registryClient = &http.Client{Timeout: 15 * time.Second}
|
||||
|
||||
// ─── Docker Registry API (manifest / config) ──────────────────────────────────
|
||||
|
||||
type registryTokenResponse struct {
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
// registryManifestOrList decodes either a manifest list (multi-arch) or a plain
|
||||
// image manifest. We check MediaType to tell them apart.
|
||||
type registryManifestOrList struct {
|
||||
MediaType string `json:"mediaType"`
|
||||
SchemaVersion int `json:"schemaVersion"`
|
||||
// manifest list fields
|
||||
Manifests []struct {
|
||||
Digest string `json:"digest"`
|
||||
Platform struct {
|
||||
OS string `json:"os"`
|
||||
Arch string `json:"architecture"`
|
||||
} `json:"platform"`
|
||||
} `json:"manifests"`
|
||||
// v2 image manifest fields
|
||||
Config struct {
|
||||
Digest string `json:"digest"`
|
||||
} `json:"config"`
|
||||
}
|
||||
|
||||
type registryImageConfig struct {
|
||||
Config struct {
|
||||
Cmd []string `json:"Cmd"`
|
||||
Entrypoint []string `json:"Entrypoint"`
|
||||
} `json:"config"`
|
||||
}
|
||||
|
||||
// fetchRegistryToken obtains an anonymous Bearer token for the given repository.
|
||||
// authURL is the base URL of the auth service (e.g. https://auth.docker.io).
|
||||
// registryURL is used to derive the service name expected by the auth endpoint.
|
||||
func fetchRegistryToken(namespace, name, authURL, registryURL string) (string, error) {
|
||||
// Derive the service name from the registry host (strip scheme).
|
||||
registryHost := strings.TrimPrefix(strings.TrimPrefix(registryURL, "https://"), "http://")
|
||||
url := fmt.Sprintf(
|
||||
"%s/token?service=%s&scope=repository:%s/%s:pull",
|
||||
authURL, registryHost, namespace, name)
|
||||
var t registryTokenResponse
|
||||
if err := fetchJSON(url, &t); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return t.Token, nil
|
||||
}
|
||||
|
||||
// fetchRegistryJSON performs an authenticated GET against the Docker registry and
|
||||
// decodes the JSON body into out. The Accept header covers manifest lists, OCI
|
||||
// indexes, v2 manifests and OCI manifests so that multi-arch tags are handled.
|
||||
func fetchRegistryJSON(url, token string, out interface{}) error {
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req.Header.Set("Accept",
|
||||
"application/vnd.docker.distribution.manifest.list.v2+json,"+
|
||||
"application/vnd.oci.image.index.v1+json,"+
|
||||
"application/vnd.docker.distribution.manifest.v2+json,"+
|
||||
"application/vnd.oci.image.manifest.v1+json")
|
||||
resp, err := registryClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("HTTP %d for %s", resp.StatusCode, url)
|
||||
}
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return json.Unmarshal(body, out)
|
||||
}
|
||||
|
||||
// fetchImageCommand returns the default startup command for the given image tag
|
||||
// by walking manifest → (manifest list pick) → config blob.
|
||||
// Returns "" on any error so callers can treat it as best-effort.
|
||||
func fetchImageCommand(spec DockerImageSpec, tagName, token, registryURL string) string {
|
||||
if token == "" {
|
||||
return ""
|
||||
}
|
||||
manifestURL := fmt.Sprintf("%s/v2/%s/%s/manifests/%s",
|
||||
registryURL, spec.Namespace, spec.Name, tagName)
|
||||
|
||||
var top registryManifestOrList
|
||||
if err := fetchRegistryJSON(manifestURL, token, &top); err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
configDigest := top.Config.Digest
|
||||
|
||||
// Multi-arch: pick linux/amd64, fall back to first linux entry, then first entry.
|
||||
if len(top.Manifests) > 0 {
|
||||
chosen := top.Manifests[0].Digest
|
||||
for _, m := range top.Manifests {
|
||||
if m.Platform.OS == "linux" {
|
||||
chosen = m.Digest
|
||||
if m.Platform.Arch == "amd64" {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
archManifestURL := fmt.Sprintf("%s/v2/%s/%s/manifests/%s",
|
||||
registryURL, spec.Namespace, spec.Name, chosen)
|
||||
var archManifest registryManifestOrList
|
||||
if err := fetchRegistryJSON(archManifestURL, token, &archManifest); err != nil {
|
||||
return ""
|
||||
}
|
||||
configDigest = archManifest.Config.Digest
|
||||
}
|
||||
|
||||
if configDigest == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
blobURL := fmt.Sprintf("%s/v2/%s/%s/blobs/%s",
|
||||
registryURL, spec.Namespace, spec.Name, configDigest)
|
||||
var imgCfg registryImageConfig
|
||||
if err := fetchRegistryJSON(blobURL, token, &imgCfg); err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return cmdSliceToString(imgCfg.Config.Cmd)
|
||||
}
|
||||
|
||||
// cmdSliceToString converts a Docker CMD slice to a human-readable command string.
|
||||
// Shell-form CMD (["/bin/sh", "-c", "actual command"]) is unwrapped to its inner
|
||||
// string; exec-form CMD is joined with spaces.
|
||||
func cmdSliceToString(cmd []string) string {
|
||||
if len(cmd) == 0 {
|
||||
return ""
|
||||
}
|
||||
if len(cmd) == 3 && cmd[1] == "-c" && (cmd[0] == "/bin/sh" || cmd[0] == "sh") {
|
||||
return cmd[2]
|
||||
}
|
||||
return strings.Join(cmd, " ")
|
||||
}
|
||||
|
||||
// reMarkdownImage matches the first Markdown image in a string, e.g. 
|
||||
var reMarkdownImage = regexp.MustCompile(`!\[[^\]]*\]\((https?://[^)]+)\)`)
|
||||
|
||||
@@ -465,6 +631,19 @@ func fetchJSON(url string, out interface{}) error {
|
||||
|
||||
// ─── Entry point ──────────────────────────────────────────────────────────────
|
||||
|
||||
// checkHubConnectivity returns true if the Docker Hub API is reachable.
|
||||
// It uses a short timeout so a missing network path fails fast rather than
|
||||
// blocking the whole startup sequence.
|
||||
func checkHubConnectivity(hubURL string) bool {
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Get(hubURL + "/v2/")
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
resp.Body.Close()
|
||||
return true // even a 401/404 means the endpoint is reachable
|
||||
}
|
||||
|
||||
// StartDockerScraper starts the background Docker Hub scraper goroutine.
|
||||
// It runs a full scrape immediately at startup, then repeats on the configured
|
||||
// interval. This function blocks forever and is designed to be called via
|
||||
@@ -475,8 +654,12 @@ func StartDockerScraper() {
|
||||
fmt.Println("[docker-scraper] disabled (DOCKER_SCRAPER_ENABLED=false)")
|
||||
return
|
||||
}
|
||||
fmt.Printf("[docker-scraper] started — images=%d maxTags=%d interval=%dh\n",
|
||||
len(cfg.Images), cfg.MaxTags, cfg.IntervalHours)
|
||||
if !checkHubConnectivity(cfg.HubURL) {
|
||||
fmt.Printf("[docker-scraper] disabled — %s is not reachable (set DOCKER_SCRAPER_ENABLED=false to suppress)\n", cfg.HubURL)
|
||||
return
|
||||
}
|
||||
fmt.Printf("[docker-scraper] started — images=%d maxTags=%d interval=%dh hub=%s\n",
|
||||
len(cfg.Images), cfg.MaxTags, cfg.IntervalHours, cfg.HubURL)
|
||||
|
||||
runScrape(cfg)
|
||||
|
||||
@@ -491,7 +674,7 @@ func StartDockerScraper() {
|
||||
func runScrape(cfg scraperConfig) {
|
||||
fmt.Printf("[docker-scraper] cycle started at %s\n", time.Now().Format(time.RFC3339))
|
||||
for _, spec := range cfg.Images {
|
||||
if err := scrapeImage(spec, cfg.MaxTags); err != nil {
|
||||
if err := scrapeImage(spec, cfg); err != nil {
|
||||
fmt.Printf("[docker-scraper] %s/%s: %v\n", spec.Namespace, spec.Name, err)
|
||||
}
|
||||
}
|
||||
@@ -503,19 +686,18 @@ func runScrape(cfg scraperConfig) {
|
||||
// scrapeImage fetches Docker Hub metadata for one repository, then either creates
|
||||
// a new ProcessingResource in the catalog or extends the existing one with any
|
||||
// missing tag-instances.
|
||||
func scrapeImage(spec DockerImageSpec, maxTags int) error {
|
||||
func scrapeImage(spec DockerImageSpec, cfg scraperConfig) error {
|
||||
// ── Fetch image metadata ──────────────────────────────────────────────────
|
||||
var info hubRepoInfo
|
||||
repoURL := fmt.Sprintf("https://hub.docker.com/v2/repositories/%s/%s/",
|
||||
spec.Namespace, spec.Name)
|
||||
repoURL := fmt.Sprintf("%s/v2/repositories/%s/%s/", cfg.HubURL, spec.Namespace, spec.Name)
|
||||
if err := fetchJSON(repoURL, &info); err != nil {
|
||||
return fmt.Errorf("fetch repo info: %w", err)
|
||||
}
|
||||
|
||||
// ── Fetch tags ────────────────────────────────────────────────────────────
|
||||
tagsURL := fmt.Sprintf(
|
||||
"https://hub.docker.com/v2/repositories/%s/%s/tags?page_size=%d&ordering=last_updated",
|
||||
spec.Namespace, spec.Name, maxTags)
|
||||
"%s/v2/repositories/%s/%s/tags?page_size=%d&ordering=last_updated",
|
||||
cfg.HubURL, spec.Namespace, spec.Name, cfg.MaxTags)
|
||||
var tagsResp hubTagsResponse
|
||||
if err := fetchJSON(tagsURL, &tagsResp); err != nil {
|
||||
return fmt.Errorf("fetch tags: %w", err)
|
||||
@@ -524,6 +706,13 @@ func scrapeImage(spec DockerImageSpec, maxTags int) error {
|
||||
return nil // nothing to upsert
|
||||
}
|
||||
|
||||
// Pre-fetch a registry token once per image so all tags share it.
|
||||
token, _ := fetchRegistryToken(spec.Namespace, spec.Name, cfg.AuthURL, cfg.RegistryURL)
|
||||
cmds := make(map[string]string, len(tagsResp.Results))
|
||||
for _, t := range tagsResp.Results {
|
||||
cmds[t.Name] = fetchImageCommand(spec, t.Name, token, cfg.RegistryURL)
|
||||
}
|
||||
|
||||
adminReq := &tools.APIRequest{Admin: true}
|
||||
accessor := (&resources.ProcessingResource{}).GetAccessor(adminReq)
|
||||
|
||||
@@ -531,9 +720,9 @@ func scrapeImage(spec DockerImageSpec, maxTags int) error {
|
||||
existing := findProcessingResourceByName(accessor, resourceName)
|
||||
|
||||
if existing == nil {
|
||||
return createDockerProcessingResource(accessor, spec, resourceName, info, tagsResp.Results)
|
||||
return createDockerProcessingResource(accessor, spec, resourceName, info, tagsResp.Results, cmds)
|
||||
}
|
||||
return syncDockerInstances(accessor, existing, spec, tagsResp.Results)
|
||||
return syncDockerInstances(accessor, existing, spec, tagsResp.Results, cmds)
|
||||
}
|
||||
|
||||
// resourceName returns the canonical catalog name for a DockerImageSpec.
|
||||
@@ -589,6 +778,7 @@ func createDockerProcessingResource(
|
||||
name string,
|
||||
info hubRepoInfo,
|
||||
tags []hubTag,
|
||||
cmds map[string]string,
|
||||
) error {
|
||||
resource := &resources.ProcessingResource{
|
||||
AbstractInstanciatedResource: resources.AbstractInstanciatedResource[*resources.ProcessingInstance]{
|
||||
@@ -611,7 +801,7 @@ func createDockerProcessingResource(
|
||||
}
|
||||
|
||||
for i := range tags {
|
||||
resource.AddInstances(buildPeerlessInstance(spec, tags[i]))
|
||||
resource.AddInstances(buildPeerlessInstance(spec, tags[i], cmds[tags[i].Name]))
|
||||
}
|
||||
|
||||
// StoreOne goes through GenericStoreOne which calls AbstractResource.StoreDraftDefault()
|
||||
@@ -637,6 +827,7 @@ func syncDockerInstances(
|
||||
resource *resources.ProcessingResource,
|
||||
spec DockerImageSpec,
|
||||
tags []hubTag,
|
||||
cmds map[string]string,
|
||||
) error {
|
||||
existing := map[string]bool{}
|
||||
for _, inst := range resource.Instances {
|
||||
@@ -649,7 +840,7 @@ func syncDockerInstances(
|
||||
if existing[ref] {
|
||||
continue
|
||||
}
|
||||
resource.AddInstances(buildPeerlessInstance(spec, tags[i]))
|
||||
resource.AddInstances(buildPeerlessInstance(spec, tags[i], cmds[tags[i].Name]))
|
||||
added++
|
||||
}
|
||||
if added == 0 {
|
||||
@@ -673,15 +864,8 @@ func syncDockerInstances(
|
||||
// Origin.Ref != "" (set to the canonical docker pull reference)
|
||||
//
|
||||
// ProcessingInstance.StoreDraftDefault() enforces this invariant on write.
|
||||
func buildPeerlessInstance(spec DockerImageSpec, tag hubTag) *resources.ProcessingInstance {
|
||||
func buildPeerlessInstance(spec DockerImageSpec, tag hubTag, cmd string) *resources.ProcessingInstance {
|
||||
ref := dockerRef(spec, tag.Name)
|
||||
|
||||
// Collect architecture hint from the first image manifest entry (if any).
|
||||
arch := ""
|
||||
if len(tag.Images) > 0 {
|
||||
arch = tag.Images[0].Architecture
|
||||
}
|
||||
|
||||
return &resources.ProcessingInstance{
|
||||
ResourceInstance: resources.ResourceInstance[*resources.ResourcePartnerShip[*resources.ProcessingResourcePricingProfile]]{
|
||||
AbstractObject: utils.AbstractObject{
|
||||
@@ -699,11 +883,8 @@ func buildPeerlessInstance(spec DockerImageSpec, tag hubTag) *resources.Processi
|
||||
},
|
||||
Access: &resources.ResourceAccess{
|
||||
Container: &models.Container{
|
||||
Image: ref,
|
||||
// Command, Args, Env, Volumes left empty — image defaults apply.
|
||||
Env: map[string]string{
|
||||
"ARCH": arch,
|
||||
},
|
||||
Image: ref,
|
||||
Command: cmd, // default CMD from the image config (best-effort, "" if unavailable)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -31,9 +31,7 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
|
||||
b, _ := json.Marshal(message)
|
||||
switch message.Action {
|
||||
case tools.PB_SEARCH:
|
||||
fmt.Println("EMITNATS")
|
||||
SearchMu.Lock()
|
||||
fmt.Println("afterlock")
|
||||
SearchStream[user] = make(chan []byte, 128)
|
||||
SearchStreamSeen[user] = []string{}
|
||||
SearchStreamExtend[user] = []string{}
|
||||
@@ -88,8 +86,6 @@ func ListenNATS() {
|
||||
"dtype": p.GetType(),
|
||||
"data": p,
|
||||
})
|
||||
fmt.Println("WRAPPED CHECK")
|
||||
|
||||
SearchMu.Lock()
|
||||
if a := SearchStreamExtend[resp.User]; len(a) > 0 {
|
||||
wrapped, merr = json.Marshal(map[string]interface{}{
|
||||
@@ -106,7 +102,6 @@ func ListenNATS() {
|
||||
}
|
||||
ch := SearchStream[resp.User]
|
||||
SearchMu.Unlock()
|
||||
fmt.Println(resp.User, ch, merr, alreadySeen)
|
||||
if merr != nil || alreadySeen {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user