import unittest
import tarfile

from StringIO import StringIO
from util.streamlayerformat import StreamLayerMerger, AUFS_WHITEOUT
from util.tarlayerformat import TarLayerReadException

class TestStreamLayerMerger(unittest.TestCase):
  def create_layer(self, **kwargs):
    output = StringIO()
    with tarfile.open(fileobj=output, mode='w:gz') as tar:
      for current_contents in kwargs:
        current_filename = kwargs[current_contents]

        if current_contents == '_':
          # This is a deleted file.
          if current_filename.endswith('/'):
            current_filename = current_filename[:-1]

          parts = current_filename.split('/')
          if len(parts) > 1:
            current_filename = '/'.join(parts[:-1]) + '/' + AUFS_WHITEOUT + parts[-1]
          else:
            current_filename = AUFS_WHITEOUT + parts[-1]

          current_contents = ''

        info = tarfile.TarInfo(name=current_filename)
        info.size = len(current_contents)
        tar.addfile(info, fileobj=StringIO(current_contents))

    return output.getvalue()

  def create_empty_layer(self):
    return ''

  def squash_layers(self, layers, path_prefix=None):
    def get_layers():
      return [StringIO(layer) for layer in layers]

    merger = StreamLayerMerger(get_layers, path_prefix=path_prefix)
    merged_data = ''.join(merger.get_generator())
    return merged_data

  def assertHasFile(self, squashed, filename, contents):
    with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar:
      member = tar.getmember(filename)
      self.assertEquals(contents, '\n'.join(tar.extractfile(member).readlines()))

  def assertDoesNotHaveFile(self, squashed, filename):
    with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar:
      try:
        member = tar.getmember(filename)
      except Exception as ex:
        return

      self.fail('Filename %s found' % filename)

  def test_single_layer(self):
    tar_layer = self.create_layer(
      foo = 'some_file',
      bar = 'another_file',
      meh = 'third_file')

    squashed = self.squash_layers([tar_layer])

    self.assertHasFile(squashed, 'some_file', 'foo')
    self.assertHasFile(squashed, 'another_file', 'bar')
    self.assertHasFile(squashed, 'third_file', 'meh')

  def test_multiple_layers(self):
    second_layer = self.create_layer(
      foo = 'some_file',
      bar = 'another_file',
      meh = 'third_file')

    first_layer = self.create_layer(
      top = 'top_file')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertHasFile(squashed, 'some_file', 'foo')
    self.assertHasFile(squashed, 'another_file', 'bar')
    self.assertHasFile(squashed, 'third_file', 'meh')
    self.assertHasFile(squashed, 'top_file', 'top')

  def test_multiple_layers_dot(self):
    second_layer = self.create_layer(
      foo = './some_file',
      bar = 'another_file',
      meh = './third_file')

    first_layer = self.create_layer(
      top = 'top_file')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertHasFile(squashed, './some_file', 'foo')
    self.assertHasFile(squashed, 'another_file', 'bar')
    self.assertHasFile(squashed, './third_file', 'meh')
    self.assertHasFile(squashed, 'top_file', 'top')

  def test_multiple_layers_overwrite(self):
    second_layer = self.create_layer(
      foo = 'some_file',
      bar = 'another_file',
      meh = 'third_file')

    first_layer = self.create_layer(
      top = 'another_file')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertHasFile(squashed, 'some_file', 'foo')
    self.assertHasFile(squashed, 'third_file', 'meh')
    self.assertHasFile(squashed, 'another_file', 'top')

  def test_multiple_layers_overwrite_base_dot(self):
    second_layer = self.create_layer(
      foo = 'some_file',
      bar = './another_file',
      meh = 'third_file')

    first_layer = self.create_layer(
      top = 'another_file')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertHasFile(squashed, 'some_file', 'foo')
    self.assertHasFile(squashed, 'third_file', 'meh')
    self.assertHasFile(squashed, 'another_file', 'top')
    self.assertDoesNotHaveFile(squashed, './another_file')

  def test_multiple_layers_overwrite_top_dot(self):
    second_layer = self.create_layer(
      foo = 'some_file',
      bar = 'another_file',
      meh = 'third_file')

    first_layer = self.create_layer(
      top = './another_file')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertHasFile(squashed, 'some_file', 'foo')
    self.assertHasFile(squashed, 'third_file', 'meh')
    self.assertHasFile(squashed, './another_file', 'top')
    self.assertDoesNotHaveFile(squashed, 'another_file')

  def test_deleted_file(self):
    second_layer = self.create_layer(
      foo = 'some_file',
      bar = 'another_file',
      meh = 'third_file')

    first_layer = self.create_layer(
      _ = 'another_file')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertHasFile(squashed, 'some_file', 'foo')
    self.assertHasFile(squashed, 'third_file', 'meh')
    self.assertDoesNotHaveFile(squashed, 'another_file')

  def test_deleted_readded_file(self):
    third_layer = self.create_layer(
      bar = 'another_file')

    second_layer = self.create_layer(
      foo = 'some_file',
      _ = 'another_file',
      meh = 'third_file')

    first_layer = self.create_layer(
      newagain = 'another_file')

    squashed = self.squash_layers([first_layer, second_layer, third_layer])

    self.assertHasFile(squashed, 'some_file', 'foo')
    self.assertHasFile(squashed, 'third_file', 'meh')
    self.assertHasFile(squashed, 'another_file', 'newagain')

  def test_deleted_in_lower_layer(self):
    third_layer = self.create_layer(
      bar = 'deleted_file')

    second_layer = self.create_layer(
      foo = 'some_file',
      _ = 'deleted_file',
      meh = 'third_file')

    first_layer = self.create_layer(
      top = 'top_file')

    squashed = self.squash_layers([first_layer, second_layer, third_layer])

    self.assertHasFile(squashed, 'some_file', 'foo')
    self.assertHasFile(squashed, 'third_file', 'meh')
    self.assertHasFile(squashed, 'top_file', 'top')
    self.assertDoesNotHaveFile(squashed, 'deleted_file')

  def test_deleted_in_lower_layer_with_added_dot(self):
    third_layer = self.create_layer(
      something = './deleted_file')

    second_layer = self.create_layer(
       _ = 'deleted_file')

    squashed = self.squash_layers([second_layer, third_layer])
    self.assertDoesNotHaveFile(squashed, 'deleted_file')

  def test_deleted_in_lower_layer_with_deleted_dot(self):
    third_layer = self.create_layer(
      something = './deleted_file')

    second_layer = self.create_layer(
       _ = './deleted_file')

    squashed = self.squash_layers([second_layer, third_layer])
    self.assertDoesNotHaveFile(squashed, 'deleted_file')

  def test_directory(self):
    second_layer = self.create_layer(
      foo = 'foo/some_file',
      bar = 'foo/another_file')

    first_layer = self.create_layer(
      top = 'foo/some_file')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertHasFile(squashed, 'foo/some_file', 'top')
    self.assertHasFile(squashed, 'foo/another_file', 'bar')

  def test_sub_directory(self):
    second_layer = self.create_layer(
      foo = 'foo/some_file',
      bar = 'foo/bar/another_file')

    first_layer = self.create_layer(
      top = 'foo/some_file')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertHasFile(squashed, 'foo/some_file', 'top')
    self.assertHasFile(squashed, 'foo/bar/another_file', 'bar')

  def test_delete_directory(self):
    second_layer = self.create_layer(
      foo = 'foo/some_file',
      bar = 'foo/another_file')

    first_layer = self.create_layer(
      _ = 'foo/')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertDoesNotHaveFile(squashed, 'foo/some_file')
    self.assertDoesNotHaveFile(squashed, 'foo/another_file')

  def test_delete_sub_directory(self):
    second_layer = self.create_layer(
      foo = 'foo/some_file',
      bar = 'foo/bar/another_file')

    first_layer = self.create_layer(
      _ = 'foo/bar/')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file')
    self.assertHasFile(squashed, 'foo/some_file', 'foo')

  def test_delete_sub_directory_with_dot(self):
    second_layer = self.create_layer(
      foo = 'foo/some_file',
      bar = 'foo/bar/another_file')

    first_layer = self.create_layer(
      _ = './foo/bar/')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file')
    self.assertHasFile(squashed, 'foo/some_file', 'foo')

  def test_delete_sub_directory_with_subdot(self):
    second_layer = self.create_layer(
      foo = './foo/some_file',
      bar = './foo/bar/another_file')

    first_layer = self.create_layer(
      _ = 'foo/bar/')

    squashed = self.squash_layers([first_layer, second_layer])

    self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file')
    self.assertDoesNotHaveFile(squashed, './foo/bar/another_file')
    self.assertHasFile(squashed, './foo/some_file', 'foo')

  def test_delete_directory_recreate(self):
    third_layer = self.create_layer(
      foo = 'foo/some_file',
      bar = 'foo/another_file')

    second_layer = self.create_layer(
      _ = 'foo/')

    first_layer = self.create_layer(
      baz = 'foo/some_file')

    squashed = self.squash_layers([first_layer, second_layer, third_layer])

    self.assertHasFile(squashed, 'foo/some_file', 'baz')
    self.assertDoesNotHaveFile(squashed, 'foo/another_file')

  def test_delete_directory_prefix(self):
    third_layer = self.create_layer(
      foo = 'foobar/some_file',
      bar = 'foo/another_file')

    second_layer = self.create_layer(
      _ = 'foo/')

    squashed = self.squash_layers([second_layer, third_layer])

    self.assertHasFile(squashed, 'foobar/some_file', 'foo')
    self.assertDoesNotHaveFile(squashed, 'foo/another_file')


  def test_delete_directory_pre_prefix(self):
    third_layer = self.create_layer(
      foo = 'foobar/baz/some_file',
      bar = 'foo/another_file')

    second_layer = self.create_layer(
      _ = 'foo/')

    squashed = self.squash_layers([second_layer, third_layer])

    self.assertHasFile(squashed, 'foobar/baz/some_file', 'foo')
    self.assertDoesNotHaveFile(squashed, 'foo/another_file')


  def test_delete_root_directory(self):
    third_layer = self.create_layer(
      foo = 'build/first_file',
      bar = 'build/second_file')

    second_layer = self.create_layer(
      _ = 'build')

    squashed = self.squash_layers([second_layer, third_layer])

    self.assertDoesNotHaveFile(squashed, 'build/first_file')
    self.assertDoesNotHaveFile(squashed, 'build/second_file')


  def test_tar_empty_layer(self):
    third_layer = self.create_layer(
      foo = 'build/first_file',
      bar = 'build/second_file')

    empty_layer = self.create_layer()

    squashed = self.squash_layers([empty_layer, third_layer])

    self.assertHasFile(squashed, 'build/first_file', 'foo')
    self.assertHasFile(squashed, 'build/second_file', 'bar')


  def test_data_empty_layer(self):
    third_layer = self.create_layer(
      foo = 'build/first_file',
      bar = 'build/second_file')

    empty_layer = self.create_empty_layer()

    squashed = self.squash_layers([empty_layer, third_layer])

    self.assertHasFile(squashed, 'build/first_file', 'foo')
    self.assertHasFile(squashed, 'build/second_file', 'bar')


  def test_broken_layer(self):
    third_layer = self.create_layer(
      foo = 'build/first_file',
      bar = 'build/second_file')

    broken_layer = 'not valid data'

    try:
      self.squash_layers([broken_layer, third_layer])
      self.fail('Expected exception')
    except TarLayerReadException as ex:
      self.assertEquals('Could not read layer', ex.message)

  def test_single_layer_with_prefix(self):
    tar_layer = self.create_layer(
      foo = 'some_file',
      bar = 'another_file',
      meh = 'third_file')

    squashed = self.squash_layers([tar_layer], path_prefix='foo/')

    self.assertHasFile(squashed, 'foo/some_file', 'foo')
    self.assertHasFile(squashed, 'foo/another_file', 'bar')
    self.assertHasFile(squashed, 'foo/third_file', 'meh')

  def test_multiple_layers_overwrite_with_prefix(self):
    second_layer = self.create_layer(
      foo = 'some_file',
      bar = 'another_file',
      meh = 'third_file')

    first_layer = self.create_layer(
      top = 'another_file')

    squashed = self.squash_layers([first_layer, second_layer], path_prefix='foo/')

    self.assertHasFile(squashed, 'foo/some_file', 'foo')
    self.assertHasFile(squashed, 'foo/third_file', 'meh')
    self.assertHasFile(squashed, 'foo/another_file', 'top')


  def test_superlong_filename(self):
    tar_layer = self.create_layer(
      meh = 'this_is_the_filename_that_never_ends_it_goes_on_and_on_my_friend_some_people_started')

    squashed = self.squash_layers([tar_layer],
        path_prefix='foo/')

    self.assertHasFile(squashed, 'foo/this_is_the_filename_that_never_ends_it_goes_on_and_on_my_friend_some_people_started', 'meh')


  def test_superlong_prefix(self):
    tar_layer = self.create_layer(
      foo = 'some_file',
      bar = 'another_file',
      meh = 'third_file')

    squashed = self.squash_layers([tar_layer],
        path_prefix='foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/')

    self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/some_file', 'foo')
    self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/another_file', 'bar')
    self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/third_file', 'meh')


if __name__ == '__main__':
  unittest.main()