# Copyright (C) 2025 Siemens
#
# SPDX-License-Identifier: MIT
"""
This module contains wrappers of the snapshot.debian.org machine-usable interface
documented in https://salsa.debian.org/snapshot-team/snapshot/raw/master/API.
"""
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from datetime import datetime
from debian import deb822
import logging
import requests
from requests.exceptions import RequestException
from ..dpkg import package
from ..util.checksum import (
ChecksumAlgo,
NoMatchingDigestError,
calculate_checksums,
checksums_from_dsc,
verify_best_matching_digest,
)
from ..download.resolver import RemoteFile, PackageResolverCache, Resolver, ResolveError
logger = logging.getLogger(__name__)
UPSTREAM_ARCHIVE_ORDER = ["debian", "debian-security", "debian-debug", "debian-ports"]
[docs]
class SnapshotDataLakeError(Exception):
"""
All client exceptions inherit from this
"""
pass
[docs]
class NotFoundOnSnapshotError(SnapshotDataLakeError, FileNotFoundError):
"""
The requested file is not found on the snapshot mirror
"""
pass
[docs]
class SnapshotResolveError(ResolveError):
pass
[docs]
class Package:
"""
Source package name (without specific version)
"""
def __init__(self, sdl: "SnapshotDataLake", name: str):
self.sdl = sdl
self.name = name
[docs]
def versions(self) -> Iterable["SourcePackage"]:
"""
Iterate all versions of a ``SourcePackage``.
"""
data = self.sdl.get(path=f"/mr/package/{self.name}/").json()
for v in data["result"]:
yield SourcePackage(self.sdl, self.name, v["version"])
[docs]
class SourcePackage:
"""
Source package in a specific version
"""
def __init__(self, sdl: "SnapshotDataLake", name: str, version: str):
self.sdl = sdl
self.name = name
self.version = version
[docs]
def srcfiles(
self, archive: str | None = None, sha1: str | None = None
) -> Iterable["SnapshotRemoteFile"]:
"""
All files associated with the source package. Returns multiple SnaphshotRemoteFile
instances for a single hash in case the file is known under multiple names.
If the package is not known to the snapshot mirror, raises NotFoundOnSnapshotError.
If the filtering does not match any, return empty iterator.
"""
data = self.sdl.get(
path=f"/mr/package/{self.name}/{self.version}/srcfiles?fileinfo=1"
).json()
fileinfo = data.get("fileinfo")
for s in data.get("result", []):
hash = s["hash"]
for res in fileinfo[hash]:
rf = SnapshotRemoteFile.fromfileinfo(self.sdl, hash, res)
if archive and rf.archive_name != archive:
continue
if sha1 and rf.hash != sha1:
continue
rf.architecture = "source"
yield rf
[docs]
def binpackages(self) -> Iterable["BinaryPackage"]:
"""
All binary packages created from this source package
"""
data = self.sdl.get(path=f"/mr/package/{self.name}/{self.version}/binpackages").json()
for b in data.get("result", []):
yield BinaryPackage(self.sdl, b["name"], b["version"], self.name, self.version)
[docs]
class BinaryPackage:
"""
Binary package in a specific version
"""
def __init__(
self,
sdl: "SnapshotDataLake",
binname: str,
binversion: str,
srcname: str | None,
srcversion: str | None,
):
self.sdl = sdl
self.binname = binname
self.binversion = binversion
self.srcname = srcname
self.srcversion = srcversion
[docs]
def files(self, arch: str | None = None) -> Iterable["SnapshotRemoteFile"]:
"""
All files associated with this binary package (e.g. per-architecture)
If no architecture is specified, all packages are returned.
Otherwise, only the packages with the matching architecture are returned.
If we have information about the source package as well, we precisely resolve the binary package
including the original path on the debian mirror. If not, we just resolve the file.
The difference is only in the metadata, the file itself is the same in both cases.
Returns multiple SnapshotRemoteFile instances for a single hash in case the file is known under
multiple names.
"""
if self.srcname and self.srcversion:
# resolve via source package
api = (
f"/mr/package/{self.srcname}/{self.srcversion}"
f"/binfiles/{self.binname}/{self.binversion}"
"?fileinfo=1"
)
else:
# resolve via binary only
api = f"/mr/binary/{self.binname}/{self.binversion}/binfiles?fileinfo=1"
data = self.sdl.get(path=api).json()
fileinfo = data.get("fileinfo")
for f in data.get("result"):
hash = f["hash"]
for res in fileinfo[hash]:
rf = SnapshotRemoteFile.fromfileinfo(self.sdl, hash, res)
rf.architecture = f["architecture"]
if arch and arch != rf.architecture:
continue
yield rf
[docs]
@dataclass(kw_only=True)
class SnapshotRemoteFile(RemoteFile):
"""
File on the snapshot farm.
"""
size: int
path: str
first_seen: int
architecture: str | None = None
[docs]
@staticmethod
def fromfileinfo(sdl, hash: str, fileinfo: Mapping) -> "SnapshotRemoteFile":
"""
Factory to create a ``SnapshotRemoteFile`` from a fileinfo object.
"""
return SnapshotRemoteFile(
checksums={ChecksumAlgo.SHA1SUM: hash},
filename=fileinfo["name"],
size=fileinfo["size"],
archive_name=fileinfo["archive_name"],
path=fileinfo["path"],
first_seen=int(datetime.fromisoformat(fileinfo["first_seen"]).timestamp()),
downloadurl=sdl.url + f"/file/{hash}/{fileinfo['name']}",
)
[docs]
class SnapshotDataLake:
"""
Snapshot instance to query against. If you use this API from a tool,
please use a dedicated requests session and set a custom user-agent header.
"""
def __init__(
self, url="https://snapshot.debian.org", session: requests.Session = requests.Session()
):
self.url = url
# reuse the same connection for all requests
self.rs = session
[docs]
def get(self, path: str = None, url: str = None) -> requests.Response:
"""
Perform a GET request on the snapshot server. Either a full URL or a path relative to the
base URL must be provided.
"""
if (url is None) == (path is None):
raise ValueError("either path or url must be provided")
try:
response: requests.Response = self.rs.get(self.url + path if path else url)
if response.status_code == 404:
raise NotFoundOnSnapshotError()
response.raise_for_status()
return response
except RequestException as e:
raise SnapshotDataLakeError(e)
[docs]
def packages(self) -> Iterable[Package]:
"""
Iterate all known packages on the mirror. The request is costly.
If you need to access a package by name, create the ``Package`` directly.
"""
data = self.get(path="/mr/package/").json()
for p in data.get("result", []):
yield Package(self, p["package"])
[docs]
def fileinfo(self, hash: str) -> Iterable[SnapshotRemoteFile]:
"""
Retrieve information about a file by hash.
"""
data = self.get(path=f"/mr/file/{hash}/info").json()
for f in data.get("result", []):
yield SnapshotRemoteFile.fromfileinfo(self, hash, f)
[docs]
class SnapshotRemoteDscFile:
"""
Wrapper around a RemoteFile to only iterate the files referenced in the dsc file.
"""
sdl: SnapshotDataLake
dscfile: SnapshotRemoteFile
allfiles: list[SnapshotRemoteFile]
def __init__(
self, sdl: SnapshotDataLake, dscfile: SnapshotRemoteFile, allfiles: list[SnapshotRemoteFile]
):
self.sdl = sdl
self.dscfile = dscfile
self.allfiles = list(filter(lambda rf: rf.archive_name == dscfile.archive_name, allfiles))
self._fetch()
def _fetch(self):
r = self.sdl.get(url=self.dscfile.downloadurl)
self.checksums = calculate_checksums(r.content)
self._dsc = deb822.Dsc(r.content)
@property
def filename(self):
return self.dscfile.filename
@property
def archive_name(self):
return self.dscfile.archive_name
@property
def path(self):
return self.dscfile.path
[docs]
def srcfiles(self) -> Iterable["SnapshotRemoteFile"]:
"""
Yields RemoteFile objects from self.allfiles that match
checksums defined in self._dsc.
"""
dsc_checksums = checksums_from_dsc(self._dsc)
for rf in self.allfiles:
try:
if verify_best_matching_digest(rf.checksums, dsc_checksums.get(rf.filename)):
yield rf
except NoMatchingDigestError:
continue
[docs]
class UpstreamResolver(Resolver):
"""
Helper to lookup packages on an upstream snapshot server. The lookup works as following:
Binary package: ask the snapshot client for files of a binary package with name, version
and architecture
Source package (with checksum): ask the snapshot client for all files related to the source
package identified by name and version. Then, sort the list by to sorting order and
filter all .dsc files in the returned list. For each dsc file, fetch it and compute the
checksum. If the checksum is not matching, ignore it. If it is matching, yield it and yield
all referenced source files of the .dsc file.
Source package (without checksum): ask the snapshot client for all files related to the
source package identified by name and version. Then, sort the list by to sorting order and
deduplicate based on (archive_name, filename). Note, that each deduplication contains the
most recent file.
Sorting order: First by archive_name (priority), then by first_seen (descending).
Checksum computation: The checksums of the returned files are not checked at this stage
(except for the .dsc files for source packages with checksum information). This operation is
left to the caller (usually the downloader), as it creates potentially a lot of traffic
between the snapshot mirror and the downloader. The resolving operations itself are cached
in the cache, but the download artifacts have to be cached by the caller.
"""
def __init__(self, sdl: SnapshotDataLake, cache: PackageResolverCache | None = None):
super().__init__(cache)
self.sdl = sdl
@classmethod
def _sort_by_archive(
cls,
files: Iterable["SnapshotRemoteFile"] | Iterable["SnapshotRemoteDscFile"],
) -> list["SnapshotRemoteFile"] | list["SnapshotRemoteDscFile"]:
"""
Sort the input list by priority of the upstream archives. By that, we can iterate
the items in the most likely order to have checksum matches more likely early.
"""
priority = {name: i for i, name in enumerate(UPSTREAM_ARCHIVE_ORDER)}
default_prio = len(UPSTREAM_ARCHIVE_ORDER)
return sorted(
files,
key=lambda f: (
# Primary: archive priority
priority.get(f.archive_name, default_prio),
# Secondary: most recent “first_seen” first (descending)
-f.first_seen,
),
)
@classmethod
def _distinct_by_archive_filename(
cls, files: Iterable[SnapshotRemoteFile]
) -> Iterable[SnapshotRemoteFile]:
"""
Return a list of RemoteFiles that is made unique on archive and filename key.
If multiple elements share the same keys, the first seen is returned.
"""
seen: set[tuple[str, str]] = set()
for file in files:
key = (file.archive_name, file.filename)
if key not in seen:
seen.add(key)
yield file
@classmethod
def _resolve_dsc_files(
cls, pkg: SourcePackage, archive: str | None = None
) -> Iterable["SnapshotRemoteDscFile"]:
"""
Locate all .dsc files associated with the source package and lazily create
RemoteDscFile instances to lookup associated artifacts.
"""
files = cls._sort_by_archive(pkg.srcfiles(archive=archive))
for f in files:
if f.filename.endswith(".dsc"):
yield SnapshotRemoteDscFile(sdl=pkg.sdl, dscfile=f, allfiles=files)
def _filter_rel_sources(
self, srcpkg: package.SourcePackage, sdlpkg: SourcePackage
) -> Iterable[SnapshotRemoteFile]:
"""
A debian source package can be found in multiple snapshot archives with varying
content and checksum. In case we have a checksum, download all .dsc files until
we find the one with a matching checksum. Then use the .dsc file to locate all other
referenced artifacts.
"""
if not srcpkg.checksums or len(srcpkg.checksums) == 0:
# a source package should be uniquely identifiable by just its name + version,
# so we do not want to emit a warning here;
# see https://lists.debian.org/debian-devel/2025/10/msg00236.html
logger.info(f"no digest for {srcpkg}. Lookup will be imprecise")
yield from self._distinct_by_archive_filename(self._sort_by_archive(sdlpkg.srcfiles()))
return
dscfiles = self._resolve_dsc_files(sdlpkg, archive=None)
for d in dscfiles:
try:
if verify_best_matching_digest(d.checksums, srcpkg.checksums):
yield d.dscfile
yield from d.srcfiles()
return
except NoMatchingDigestError:
continue
[docs]
def resolve(self, p: package.Package) -> list["RemoteFile"]:
"""
Resolve a local package to references on the upstream snapshot mirror
"""
# Determine which type of package and fetch files
try:
if p.is_source():
files = self._filter_rel_sources(p, SourcePackage(self.sdl, p.name, str(p.version)))
else:
files = BinaryPackage(self.sdl, p.name, str(p.version), None, None).files(
arch=p.architecture
)
files_list = list(files)
except SnapshotDataLakeError as e:
raise SnapshotResolveError(e)
return files_list