#!/bin/env python3 from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager import os import os.path import shutil import subprocess def handle_identify_command(image, dest_path): with _buildah_build(image) as (container, mount_path): print(f'Using {container} at path {mount_path}...') release = _run_command( ('rpm', '-q', '--queryformat', '%{VERSION}\n', '--root', mount_path, '-f', '/etc/os-release')).strip() src_rpms = _run_command( ('rpm', '-qa', '--root', mount_path, '--queryformat', '%{SOURCERPM}\n')).splitlines() src_rpms = sorted({src_rpm for src_rpm in src_rpms if src_rpm and '(none)' not in src_rpm}) os.makedirs(dest_path, exist_ok=True) with ThreadPoolExecutor(max_workers=20) as executor: { executor.submit(_download_srpm, src_rpm, release, dest_path): src_rpm for src_rpm in src_rpms } def _download_srpm(src_rpm, release, dest_path): if os.path.exists(os.path.join(dest_path, src_rpm)): return rpm_name = src_rpm.replace('.src.rpm', '') print(f'Fetching source for {rpm_name}...') try: _run_command(('dnf', 'download', '--release', release, '--source', rpm_name), {'cwd': dest_path}) except RuntimeError: print(f'ERROR: Cannot download source for {rpm_name}') def handle_create_command(dest_path, dest_image): with _buildah_build('scratch') as (container, mount_path): print(f'Using {container} at path {mount_path}...') for source_archive in sorted(os.listdir(dest_path)): source_archive_path = os.path.join(dest_path, source_archive) if not os.path.isfile(source_archive_path): continue print(f'Processing {source_archive} source archive...') if source_archive.endswith('.src.rpm'): copy_path = os.path.join(mount_path, 'RPMS') else: copy_path = os.path.join(mount_path, 'others') os.makedirs(copy_path, exist_ok=True) shutil.copy(source_archive_path, copy_path) _run_command(('buildah', 'commit', container, source_archive)) _run_command(('buildah', 'commit', container, dest_image)) @contextmanager def _buildah_build(image): container = _run_command(('buildah', 'from', image)).strip() mount_path = _run_command(('buildah', 'mount', container)).strip() yield container, mount_path _run_command(('buildah', 'umount', container)) _run_command(('buildah', 'rm', container)) def _run_command(command, params=None): if params is None: params = {} params.setdefault('capture_output', True) params.setdefault('universal_newlines', True) params.setdefault('encoding', 'utf-8') response = subprocess.run(command, **params) returncode = response.returncode if returncode != 0: raise RuntimeError(f'Command "{command}" failed with return code {returncode}') return response.stdout if __name__ == '__main__': from argparse import ArgumentParser parser = ArgumentParser( description='Tool to build a source image based on an existing OCI image') subparsers = parser.add_subparsers(dest='command') identifier_parser = subparsers.add_parser( 'identify', help='Identify and download the source code for an existing OCI image') identifier_parser.add_argument('image', help='Reference to an existing OCI image') identifier_parser.add_argument( '--dest-path', default='sources', help='Local path to download sources, defaults to "sources"') create_parser = subparsers.add_parser( 'create', help='Create a source image with the provide sources') create_parser.add_argument( 'dest_image', help='Target reference to be used for the source image') create_parser.add_argument( '--dest-path', default='sources', help='Local path containing sources, defaults to "sources"') args = parser.parse_args() if args.command == 'identify': handle_identify_command(args.image, args.dest_path) elif args.command == 'create': handle_create_command(args.dest_path, args.dest_image) else: raise ValueError('Please specify a valid subcommand')