from datetime import datetime, timedelta

import pytest

from playhouse.test_utils import assert_query_count

from app import docker_v2_signing_key
from data import model
from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob,
                           ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image,
                           TagManifestLabel, TagManifest, TagManifestLabel)
from data.registry_model.registry_pre_oci_model import PreOCIModel
from data.registry_model.datatypes import RepositoryReference

from test.fixtures import *

@pytest.fixture()
def pre_oci_model(initialized_db):
  return PreOCIModel()


@pytest.mark.parametrize('names, expected', [
  (['unknown'], None),
  (['latest'], {'latest'}),
  (['latest', 'prod'], {'latest', 'prod'}),
  (['latest', 'prod', 'another'], {'latest', 'prod'}),
  (['foo', 'prod'], {'prod'}),
])
def test_find_matching_tag(names, expected, pre_oci_model):
  repo = model.repository.get_repository('devtable', 'simple')
  repository_ref = RepositoryReference.for_repo_obj(repo)
  found = pre_oci_model.find_matching_tag(repository_ref, names)
  if expected is None:
    assert found is None
  else:
    assert found.name in expected


@pytest.mark.parametrize('repo_namespace, repo_name, expected', [
  ('devtable', 'simple', {'latest', 'prod'}),
  ('buynlarge', 'orgrepo', {'latest', 'prod'}),
])
def test_get_most_recent_tag(repo_namespace, repo_name, expected, pre_oci_model):
  repo = model.repository.get_repository(repo_namespace, repo_name)
  repository_ref = RepositoryReference.for_repo_obj(repo)
  found = pre_oci_model.get_most_recent_tag(repository_ref)
  if expected is None:
    assert found is None
  else:
    assert found.name in expected


@pytest.mark.parametrize('repo_namespace, repo_name, expected', [
  ('devtable', 'simple', True),
  ('buynlarge', 'orgrepo', True),
  ('buynlarge', 'unknownrepo', False),
])
def test_lookup_repository(repo_namespace, repo_name, expected, pre_oci_model):
  repo_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
  if expected:
    assert repo_ref
  else:
    assert repo_ref is None


@pytest.mark.parametrize('repo_namespace, repo_name', [
  ('devtable', 'simple'),
  ('buynlarge', 'orgrepo'),
])
def test_lookup_manifests(repo_namespace, repo_name, pre_oci_model):
  repo = model.repository.get_repository(repo_namespace, repo_name)
  repository_ref = RepositoryReference.for_repo_obj(repo)
  found_tag = pre_oci_model.find_matching_tag(repository_ref, ['latest'])
  found_manifest = pre_oci_model.get_manifest_for_tag(found_tag)
  found = pre_oci_model.lookup_manifest_by_digest(repository_ref, found_manifest.digest,
                                                  include_legacy_image=True)
  assert found._db_id == found_manifest._db_id
  assert found.digest == found_manifest.digest
  assert found.legacy_image


def test_lookup_unknown_manifest(pre_oci_model):
  repo = model.repository.get_repository('devtable', 'simple')
  repository_ref = RepositoryReference.for_repo_obj(repo)
  found = pre_oci_model.lookup_manifest_by_digest(repository_ref, 'sha256:deadbeef')
  assert found is None


@pytest.mark.parametrize('repo_namespace, repo_name', [
  ('devtable', 'simple'),
  ('devtable', 'complex'),
  ('devtable', 'history'),
  ('buynlarge', 'orgrepo'),
])
def test_legacy_images(repo_namespace, repo_name, pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
  legacy_images = pre_oci_model.get_legacy_images(repository_ref)
  assert len(legacy_images)

  found_tags = set()
  for image in legacy_images:
    found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id,
                                                 include_parents=True)

    with assert_query_count(4 if found_image.parents else 3):
      found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id,
                                                   include_parents=True)
      assert found_image.docker_image_id == image.docker_image_id
      assert found_image.parents == image.parents

    # Check that the tags list can be retrieved.
    assert image.tags is not None
    found_tags.update({tag.name for tag in image.tags})

    # Check against the actual DB row.
    model_image = model.image.get_image(repository_ref._db_id, found_image.docker_image_id)
    assert model_image.id == found_image._db_id
    assert ([pid for pid in reversed(model_image.ancestor_id_list())] ==
            [p._db_id for p in found_image.parents])

    # Try without parents and ensure it raises an exception.
    found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id,
                                                 include_parents=False)
    with pytest.raises(Exception):
      assert not found_image.parents

  assert found_tags

  unknown = pre_oci_model.get_legacy_image(repository_ref, 'unknown', include_parents=True)
  assert unknown is None


def test_manifest_labels(pre_oci_model):
  repo = model.repository.get_repository('devtable', 'simple')
  repository_ref = RepositoryReference.for_repo_obj(repo)
  found_tag = pre_oci_model.find_matching_tag(repository_ref, ['latest'])
  found_manifest = pre_oci_model.get_manifest_for_tag(found_tag)

  # Create a new label.
  created = pre_oci_model.create_manifest_label(found_manifest, 'foo', 'bar', 'api')
  assert created.key == 'foo'
  assert created.value == 'bar'
  assert created.source_type_name == 'api'
  assert created.media_type_name == 'text/plain'

  # Ensure we can look it up.
  assert pre_oci_model.get_manifest_label(found_manifest, created.uuid) == created

  # Ensure it is in our list of labels.
  assert created in pre_oci_model.list_manifest_labels(found_manifest)
  assert created in pre_oci_model.list_manifest_labels(found_manifest, key_prefix='fo')

  # Ensure it is *not* in our filtered list.
  assert created not in pre_oci_model.list_manifest_labels(found_manifest, key_prefix='ba')

  # Delete the label and ensure it is gone.
  assert pre_oci_model.delete_manifest_label(found_manifest, created.uuid)
  assert pre_oci_model.get_manifest_label(found_manifest, created.uuid) is None
  assert created not in pre_oci_model.list_manifest_labels(found_manifest)


@pytest.mark.parametrize('repo_namespace, repo_name', [
  ('devtable', 'simple'),
  ('devtable', 'complex'),
  ('devtable', 'history'),
  ('buynlarge', 'orgrepo'),
])
def test_repository_tags(repo_namespace, repo_name, pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)

  with assert_query_count(1):
    tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True)
    assert len(tags)

  for tag in tags:
    with assert_query_count(2):
      found_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True)
      assert found_tag == tag

    if found_tag.legacy_image is None:
      continue

    with assert_query_count(2):
      found_image = pre_oci_model.get_legacy_image(repository_ref,
                                                   found_tag.legacy_image.docker_image_id)
      assert found_image == found_tag.legacy_image


def test_repository_tag_history(pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository('devtable', 'history')

  with assert_query_count(2):
    history, has_more = pre_oci_model.list_repository_tag_history(repository_ref)
    assert not has_more
    assert len(history) == 2


@pytest.mark.parametrize('repo_namespace, repo_name', [
  ('devtable', 'simple'),
  ('devtable', 'complex'),
  ('devtable', 'history'),
  ('buynlarge', 'orgrepo'),
])
def test_delete_tags(repo_namespace, repo_name, pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
  tags = pre_oci_model.list_repository_tags(repository_ref)
  assert len(tags)

  # Save history before the deletions.
  previous_history, _ = pre_oci_model.list_repository_tag_history(repository_ref, size=1000)
  assert len(previous_history) >= len(tags)

  # Delete every tag in the repository.
  for tag in tags:
    assert pre_oci_model.delete_tag(repository_ref, tag.name)

    # Make sure the tag is no longer found.
    with assert_query_count(1):
      found_tag = pre_oci_model.get_repo_tag(repository_ref, tag.name, include_legacy_image=True)
      assert found_tag is None

  # Ensure all tags have been deleted.
  tags = pre_oci_model.list_repository_tags(repository_ref)
  assert not len(tags)

  # Ensure that the tags all live in history.
  history, _ = pre_oci_model.list_repository_tag_history(repository_ref, size=1000)
  assert len(history) == len(previous_history)


@pytest.mark.parametrize('use_manifest', [
  True,
  False,
])
def test_retarget_tag_history(use_manifest, pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository('devtable', 'history')
  history, _ = pre_oci_model.list_repository_tag_history(repository_ref)

  if use_manifest:
    manifest_or_legacy_image = pre_oci_model.lookup_manifest_by_digest(repository_ref,
                                                                       history[1].manifest_digest,
                                                                       allow_dead=True)
  else:
    manifest_or_legacy_image = history[1].legacy_image

  # Retarget the tag.
  assert manifest_or_legacy_image
  updated_tag = pre_oci_model.retarget_tag(repository_ref, 'latest', manifest_or_legacy_image,
                                           is_reversion=True)

  # Ensure the tag has changed targets.
  if use_manifest:
    assert updated_tag.manifest_digest == manifest_or_legacy_image.digest
  else:
    assert updated_tag.legacy_image == manifest_or_legacy_image

  # Ensure history has been updated.
  new_history, _ = pre_oci_model.list_repository_tag_history(repository_ref)
  assert len(new_history) == len(history) + 1


def test_retarget_tag(pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository('devtable', 'complex')
  history, _ = pre_oci_model.list_repository_tag_history(repository_ref)

  prod_tag = pre_oci_model.get_repo_tag(repository_ref, 'prod', include_legacy_image=True)

  # Retarget the tag.
  updated_tag = pre_oci_model.retarget_tag(repository_ref, 'latest', prod_tag.legacy_image)

  # Ensure the tag has changed targets.
  assert updated_tag.legacy_image == prod_tag.legacy_image

  # Ensure history has been updated.
  new_history, _ = pre_oci_model.list_repository_tag_history(repository_ref)
  assert len(new_history) == len(history) + 1


def test_change_repository_tag_expiration(pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository('devtable', 'simple')
  tag = pre_oci_model.get_repo_tag(repository_ref, 'latest')
  assert tag.lifetime_end_ts is None

  new_datetime = datetime.utcnow() + timedelta(days=2)
  previous, okay = pre_oci_model.change_repository_tag_expiration(tag, new_datetime)

  assert okay
  assert previous is None

  tag = pre_oci_model.get_repo_tag(repository_ref, 'latest')
  assert tag.lifetime_end_ts is not None


@pytest.mark.parametrize('repo_namespace, repo_name, expected_non_empty', [
  ('devtable', 'simple', []),
  ('devtable', 'complex', ['prod', 'v2.0']),
  ('devtable', 'history', ['latest']),
  ('buynlarge', 'orgrepo', []),
  ('devtable', 'gargantuan', ['v2.0', 'v3.0', 'v4.0', 'v5.0', 'v6.0']),
])
def test_get_legacy_images_owned_by_tag(repo_namespace, repo_name, expected_non_empty,
                                        pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
  tags = pre_oci_model.list_repository_tags(repository_ref)
  assert len(tags)

  non_empty = set()
  for tag in tags:
    if pre_oci_model.get_legacy_images_owned_by_tag(tag):
      non_empty.add(tag.name)

  assert non_empty == set(expected_non_empty)


def test_get_security_status(pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository('devtable', 'simple')
  tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True)
  assert len(tags)

  for tag in tags:
    assert pre_oci_model.get_security_status(tag.legacy_image)


@pytest.fixture()
def clear_rows(initialized_db):
  # Remove all new-style rows so we can backfill.
  TagManifestLabelMap.delete().execute()
  ManifestLabel.delete().execute()
  ManifestBlob.delete().execute()
  ManifestLegacyImage.delete().execute()
  TagManifestToManifest.delete().execute()
  Manifest.delete().execute()
  TagManifestLabel.delete().execute()
  TagManifest.delete().execute()


@pytest.mark.parametrize('repo_namespace, repo_name', [
  ('devtable', 'simple'),
  ('devtable', 'complex'),
  ('devtable', 'history'),
  ('buynlarge', 'orgrepo'),
])
def test_backfill_manifest_for_tag(repo_namespace, repo_name, clear_rows, pre_oci_model):
  repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name)
  tags = pre_oci_model.list_repository_tags(repository_ref)
  assert tags

  for tag in tags:
    assert not tag.manifest_digest
    assert pre_oci_model.backfill_manifest_for_tag(tag)

  tags = pre_oci_model.list_repository_tags(repository_ref, include_legacy_images=True)
  assert tags
  for tag in tags:
    assert tag.manifest_digest

    manifest = pre_oci_model.get_manifest_for_tag(tag)
    assert manifest

    legacy_image = pre_oci_model.get_legacy_image(repository_ref, tag.legacy_image.docker_image_id,
                                                  include_parents=True)

    parsed_manifest = manifest.get_parsed_manifest()
    assert parsed_manifest.leaf_layer_v1_image_id == legacy_image.docker_image_id
    assert parsed_manifest.parent_image_ids == {p.docker_image_id for p in legacy_image.parents}