from data.oci_model import tag as tag_model from data.database import Tag, Channel def get_channel_releases(repo, channel): """ Return all previously linked tags. This works based upon Tag lifetimes. """ tag_kind_id = Channel.tag_kind.get_id('channel') channel_name = channel.name return (Tag .select(Tag, Channel) .join(Channel, on=(Tag.id == Channel.linked_tag)) .where(Channel.repository == repo, Channel.name == channel_name, Channel.tag_kind == tag_kind_id, Channel.lifetime_end != None) .order_by(Tag.lifetime_end)) def get_channel(repo, channel_name): """ Find a Channel by name. """ channel = tag_model.get_tag(repo, channel_name, "channel") return channel def get_tag_channels(repo, tag_name, active=True): """ Find the Channels associated with a Tag. """ tag = tag_model.get_tag(repo, tag_name, "release") query = tag.tag_parents if active: query = tag_model.tag_alive_oci(query) return query def delete_channel(repo, channel_name): """ Delete a channel by name. """ return tag_model.delete_tag(repo, channel_name, "channel") def create_or_update_channel(repo, channel_name, tag_name): """ Creates or updates a channel to include a particular tag. """ tag = tag_model.get_tag(repo, tag_name, 'release') return tag.create_or_update_tag(repo, channel_name, linked_tag=tag, tag_kind="channel") def get_repo_channels(repo): """ Creates or updates a channel to include a particular tag. """ tag_kind_id = Channel.tag_kind.get_id('channel') query = (Channel .select(Channel, Tag) .join(Tag, on=(Tag.id == Channel.linked_tag)) .where(Channel.repository == repo, Channel.tag_kind == tag_kind_id)) return tag_model.tag_alive_oci(query, cls=Channel)