#!/bin/env python3 from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from hashlib import sha256 from urllib.parse import urlparse import json import os import os.path import shutil import subprocess import tarfile import tempfile # atomic-reactor needs a patch to make sure it works for this use case: # https://github.com/containerbuildsystem/atomic-reactor/pull/1239 from atomic_reactor.util import RegistrySession, ImageName def handle_identify_command(image, dest_path): with _buildah_build(image) as (container, mount_path): print(f'Using {container} at path {mount_path}...') release = _run_command( ('rpm', '-q', '--queryformat', '%{VERSION}\n', '--root', mount_path, '-f', '/etc/os-release')).strip() src_rpms = _run_command( ('rpm', '-qa', '--root', mount_path, '--queryformat', '%{SOURCERPM}\n')).splitlines() src_rpms = sorted({src_rpm for src_rpm in src_rpms if src_rpm and '(none)' not in src_rpm}) os.makedirs(dest_path, exist_ok=True) with ThreadPoolExecutor(max_workers=20) as executor: { executor.submit(_download_srpm, src_rpm, release, dest_path): src_rpm for src_rpm in src_rpms } def _download_srpm(src_rpm, release, dest_path): if os.path.exists(os.path.join(dest_path, src_rpm)): return rpm_name = src_rpm.replace('.src.rpm', '') print(f'Fetching source for {rpm_name}...') try: _run_command(('dnf', 'download', '--release', release, '--source', rpm_name), {'cwd': dest_path}) except RuntimeError: print(f'ERROR: Cannot download source for {rpm_name}') def handle_create_command(dest_path, dest_image): with _buildah_build('scratch') as (container, mount_path): print(f'Using {container} at path {mount_path}...') for source_archive in sorted(os.listdir(dest_path)): source_archive_path = os.path.join(dest_path, source_archive) if not os.path.isfile(source_archive_path): continue print(f'Processing {source_archive} source archive...') if source_archive.endswith('.src.rpm'): copy_path = os.path.join(mount_path, 'RPMS') else: copy_path = os.path.join(mount_path, 'others') os.makedirs(copy_path, exist_ok=True) shutil.copy(source_archive_path, copy_path) _run_command(('buildah', 'commit', container, source_archive)) _run_command(('buildah', 'commit', container, dest_image)) def handle_push_command(sources_path, image_reference): image = ImageName.parse(image_reference) _validate_image_reference(image) registry = RegistrySession( image.registry, access=('push', 'pull'), dockercfg_path=os.path.expanduser('~/.docker/config.json')) blobs = [] for source_archive in sorted(os.listdir(sources_path)): source_archive_path = os.path.join(sources_path, source_archive) if not os.path.isfile(source_archive_path): continue print(f'Processing {source_archive} source archive...') blob_info = _compute_blob_info(source_archive_path) print(blob_info) blobs.append(blob_info) if _blob_exists(registry, image, blob_info['digest']): print('Blob already exists, skipping...') continue _create_source_blob(registry, image, source_archive_path, blob_info) image_config = _create_image_config(registry, image, blobs) _create_image_manifest(registry, image, blobs, image_config) def _validate_image_reference(image): assert image.registry assert image.get_repo() assert image.tag def _compute_blob_info(path): size = os.stat(path).st_size with open(path, 'rb') as f: hexdigest = sha256(f.read()).hexdigest() return {'digest': f'sha256:{hexdigest}', 'size': size} def _blob_exists(registry, image, blob_digest): name = image.get_repo() response = registry.head(f'/v2/{name}/blobs/{blob_digest}') return response.status_code == 200 def _create_source_blob(registry, image, path, blob_info): basename = os.path.basename(path) if False and not basename.endswith('.tar.gz'): print( 'WARNING: Generating tarball on the fly causes buildah issues. ' 'It\'s recommended to create tarballs for each source archive instead.') tarball_path = os.path.join('/tmp', basename + '.tar.gz') with tarfile.open(tarball_path, 'w:gz') as archive: archive.add(path) print('Created tarball at {}'.format(tarball_path)) _create_blob(registry, image, tarball_path, blob_info) else: _create_blob(registry, image, path, blob_info) def _create_blob(registry, image, path, blob_info): name = image.get_repo() response = registry.post(f'/v2/{name}/blobs/uploads/') response.raise_for_status() location = response.headers['Location'] with open(path, 'rb') as f: response = registry.put( urlparse(location).path, data=f, params={'digest': blob_info['digest']}, ) response.raise_for_status() def _create_image_config(registry, image, blobs): config = { # TODO: Placeholders for now 'architecture': 'amd64', 'os': 'linux', 'rootfs': { 'type': 'layers', 'diff_ids': [blob['digest'] for blob in blobs], }, } with tempfile.NamedTemporaryFile(mode='w') as f: json.dump(config, f, sort_keys=True) f.flush() blob_info = _compute_blob_info(f.name) if not _blob_exists(registry, image, blob_info['digest']): print('Image config blob does not exist, creating it...') _create_blob(registry, image, f.name, blob_info) return blob_info def _create_image_manifest(registry, image, blobs, config): layers = [ { 'mediaType': 'application/vnd.docker.image.rootfs.diff.tar.gzip', 'size': blob['size'], 'digest': blob['digest'], } for blob in blobs ] image_manifest = { 'schemaVersion': 2, 'mediaType': 'application/vnd.docker.distribution.manifest.v2+json', 'config': { 'mediaType': 'application/vnd.docker.container.image.v1+json', 'size': config['size'], 'digest': config['digest'], }, 'layers': layers, } headers = {'Content-Type': 'application/vnd.docker.distribution.manifest.v2+json'} repo = image.get_repo() reference = image.tag response = registry.put( f'/v2/{repo}/manifests/{reference}', json=image_manifest, headers=headers) response.raise_for_status() @contextmanager def _buildah_build(image): container = _run_command(('buildah', 'from', image)).strip() mount_path = _run_command(('buildah', 'mount', container)).strip() yield container, mount_path _run_command(('buildah', 'umount', container)) _run_command(('buildah', 'rm', container)) def _run_command(command, params=None): if params is None: params = {} params.setdefault('capture_output', True) params.setdefault('universal_newlines', True) params.setdefault('encoding', 'utf-8') response = subprocess.run(command, **params) returncode = response.returncode if returncode != 0: raise RuntimeError(f'Command "{command}" failed with return code {returncode}') return response.stdout if __name__ == '__main__': from argparse import ArgumentParser parser = ArgumentParser( description='Tool to build a source image based on an existing OCI image') subparsers = parser.add_subparsers(dest='command') identifier_parser = subparsers.add_parser( 'identify', help='Identify and download the source code for an existing OCI image') identifier_parser.add_argument('image', help='Reference to an existing OCI image') identifier_parser.add_argument( '--dest-path', default='sources', help='Local path to download sources, defaults to "sources"') create_parser = subparsers.add_parser( 'create', help='Create a source image with the provide sources') create_parser.add_argument( 'dest_image', help='Target reference to be used for the source image') create_parser.add_argument( '--dest-path', default='sources', help='Local path containing sources, defaults to "sources"') push_parser = subparsers.add_parser( 'push', help='Create a source image directly in container registry') push_parser.add_argument( 'image', help='Target reference to be used for the source image, e.g. quay.io/foo/bar:src') push_parser.add_argument( '--sources-path', default='sources', help='Local path containing sources, defaults to "sources"') args = parser.parse_args() if args.command == 'identify': handle_identify_command(args.image, args.dest_path) elif args.command == 'create': handle_create_command(args.dest_path, args.dest_image) elif args.command == 'push': handle_push_command(args.sources_path, args.image) else: raise ValueError('Please specify a valid subcommand')