From 88b9e80cbb8349dc58398672c5bb41e19c8f742b Mon Sep 17 00:00:00 2001
From: Jake Moshenko <jake.moshenko@coreos.com>
Date: Thu, 5 Nov 2015 16:28:12 -0500
Subject: [PATCH] Backfill the v1 checksums from imagestorage

---
 ...722_backfill_parent_id_and_v1_checksums.py |  19 +++
 util/migrate/__init__.py                      |  60 ++++++++++
 util/migrate/backfill_content_checksums.py    | 108 ++++++++++++++++++
 util/migrate/backfill_v1_checksums.py         |  70 ++++++++++++
 4 files changed, 257 insertions(+)
 create mode 100644 data/migrations/versions/22af01f81722_backfill_parent_id_and_v1_checksums.py
 create mode 100644 util/migrate/backfill_content_checksums.py
 create mode 100644 util/migrate/backfill_v1_checksums.py

diff --git a/data/migrations/versions/22af01f81722_backfill_parent_id_and_v1_checksums.py b/data/migrations/versions/22af01f81722_backfill_parent_id_and_v1_checksums.py
new file mode 100644
index 000000000..e6d732dcc
--- /dev/null
+++ b/data/migrations/versions/22af01f81722_backfill_parent_id_and_v1_checksums.py
@@ -0,0 +1,19 @@
+"""Backfill parent id and v1 checksums
+
+Revision ID: 22af01f81722
+Revises: 2827d36939e4
+Create Date: 2015-11-05 16:24:43.679323
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '22af01f81722'
+down_revision = '2827d36939e4'
+
+from util.migrate.backfill_v1_checksums import backfill_checksums
+
+def upgrade(tables):
+    backfill_checksums()
+
+def downgrade(tables):
+    pass
diff --git a/util/migrate/__init__.py b/util/migrate/__init__.py
index 6b0a65feb..e1770742d 100644
--- a/util/migrate/__init__.py
+++ b/util/migrate/__init__.py
@@ -1,5 +1,12 @@
+import logging
+
 from sqlalchemy.types import TypeDecorator, Text
 from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT
+from random import shuffle
+
+
+logger = logging.getLogger(__name__)
+
 
 class UTF8LongText(TypeDecorator):
   """ Platform-independent UTF-8 LONGTEXT type.
@@ -14,3 +21,56 @@ class UTF8LongText(TypeDecorator):
       return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci'))
     else:
       return dialect.type_descriptor(Text())
+
+
+def _chance_duplication(pop_size, samples):
+  """ The chance of randomly selecting a duplicate when you choose the specified number of samples
+      from the specified population size.
+  """
+  pairs = (samples * (samples - 1)) / 2.0
+  unique = (pop_size - 1.0)/pop_size
+  all_unique = pow(unique, pairs)
+  return 1 - all_unique
+
+
+def _num_checks(pop_size, desired):
+  """ Binary search for the proper number of entries to use to get the specified collision
+      probability.
+  """
+  s_max = pop_size
+  s_min = 0
+  last_test = -1
+  s_test = s_max
+
+  while s_max > s_min and last_test != s_test:
+    last_test = s_test
+    s_test = (s_max + s_min)/2
+    chance = _chance_duplication(pop_size, s_test)
+    if chance > desired:
+      s_max = s_test - 1
+    else:
+      s_min = s_test
+
+  return s_test
+
+
+def yield_random_entries(batch_query, batch_size, collision_chance):
+  """ This method will yield semi-random items from a query in a database friendly way until no
+      more items match the base query modifier. It will pull batches of batch_size from the query
+      and yield enough items from each batch so that concurrent workers have a reduced chance of
+      selecting the same items. For example, if your batches return 10,000 entries, and you desire
+      only a .03 collision_chance, we will only use 25 random entries before going back to the db
+      for a new batch.
+  """
+
+  # Seed with some data which will pass the condition, but will be immediately discarded
+  all_candidates = [1]
+  while len(all_candidates) > 0:
+    all_candidates = list(batch_query().limit(batch_size))
+    shuffle(all_candidates)
+    num_selections = max(1, _num_checks(len(all_candidates), collision_chance))
+    logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size,
+                 num_selections)
+    candidates = all_candidates[0:num_selections]
+    for candidate in candidates:
+      yield candidate
diff --git a/util/migrate/backfill_content_checksums.py b/util/migrate/backfill_content_checksums.py
new file mode 100644
index 000000000..645b5539e
--- /dev/null
+++ b/util/migrate/backfill_content_checksums.py
@@ -0,0 +1,108 @@
+import logging
+
+from peewee import JOIN_LEFT_OUTER
+
+from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
+                    TextField)
+
+from data.database import BaseModel, db, db_for_update, CloseForLongOperation
+from app import app, storage
+from digest import checksums
+from util.migrate import yield_random_entries
+
+
+logger = logging.getLogger(__name__)
+
+
+class Repository(BaseModel):
+  pass
+
+
+# Vendor the information from tables we will be writing to at the time of this migration
+class ImageStorage(BaseModel):
+  uuid = CharField(index=True, unique=True)
+  checksum = CharField(null=True)
+  image_size = BigIntegerField(null=True)
+  uncompressed_size = BigIntegerField(null=True)
+  uploading = BooleanField(default=True, null=True)
+  cas_path = BooleanField(default=True)
+  content_checksum = CharField(null=True, index=True)
+
+
+class Image(BaseModel):
+  docker_image_id = CharField(index=True)
+  repository = ForeignKeyField(Repository)
+  ancestors = CharField(index=True, default='/', max_length=64535, null=True)
+  storage = ForeignKeyField(ImageStorage, index=True, null=True)
+  created = DateTimeField(null=True)
+  comment = TextField(null=True)
+  command = TextField(null=True)
+  aggregate_size = BigIntegerField(null=True)
+  v1_json_metadata = TextField(null=True)
+  v1_checksum = CharField(null=True)
+
+
+class ImageStorageLocation(BaseModel):
+  name = CharField(unique=True, index=True)
+
+
+class ImageStoragePlacement(BaseModel):
+  storage = ForeignKeyField(ImageStorage)
+  location = ForeignKeyField(ImageStorageLocation)
+
+
+
+def _get_image_storage_locations(storage_id):
+  placements_query = (ImageStoragePlacement
+                      .select(ImageStoragePlacement, ImageStorageLocation)
+                      .join(ImageStorageLocation)
+                      .switch(ImageStoragePlacement)
+                      .join(ImageStorage, JOIN_LEFT_OUTER)
+                      .where(ImageStorage.id == storage_id))
+
+  locations = set()
+  for placement in placements_query:
+    locations.add(placement.location.name)
+
+  return locations
+
+
+def backfill_content_checksums():
+  """ Copies metadata from image storages to their images. """
+  logger.debug('Image content checksum backfill: Began execution')
+
+  def batch_query():
+    return (ImageStorage
+            .select(ImageStorage.id, ImageStorage.uuid)
+            .where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
+
+  for candidate_storage in yield_random_entries(batch_query, 10000, 0.1):
+    logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
+
+    locations = _get_image_storage_locations(candidate_storage.id)
+
+    checksum = None
+    with CloseForLongOperation(app.config):
+      try:
+        # Compute the checksum
+        layer_path = storage.image_layer_path(candidate_storage.uuid)
+        with storage.stream_read_file(locations, layer_path) as layer_data_handle:
+          checksum = 'sha256:{0}'.format(checksums.sha256_file(layer_data_handle))
+      except Exception as exc:
+        logger.warning('Unable to compute checksum for storage: %s', candidate_storage.uuid)
+        checksum = 'unknown:{0}'.format(exc.__class__.__name__)
+
+    # Now update the ImageStorage with the checksum
+    with app.config['DB_TRANSACTION_FACTORY'](db):
+      to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
+      if to_update.content_checksum is not None:
+        logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
+      else:
+        logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
+        to_update.content_checksum = checksum
+        to_update.save()
+
+if __name__ == "__main__":
+  logging.basicConfig(level=logging.DEBUG)
+  # logging.getLogger('peewee').setLevel(logging.CRITICAL)
+  backfill_content_checksums()
diff --git a/util/migrate/backfill_v1_checksums.py b/util/migrate/backfill_v1_checksums.py
new file mode 100644
index 000000000..aef7af72e
--- /dev/null
+++ b/util/migrate/backfill_v1_checksums.py
@@ -0,0 +1,70 @@
+import logging
+
+from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
+                    TextField)
+from data.database import BaseModel, db, db_for_update
+from util.migrate import yield_random_entries
+from app import app
+
+
+logger = logging.getLogger(__name__)
+
+
+class Repository(BaseModel):
+  pass
+
+
+# Vendor the information from tables we will be writing to at the time of this migration
+class ImageStorage(BaseModel):
+  uuid = CharField(index=True, unique=True)
+  checksum = CharField(null=True)
+  image_size = BigIntegerField(null=True)
+  uncompressed_size = BigIntegerField(null=True)
+  uploading = BooleanField(default=True, null=True)
+  cas_path = BooleanField(default=True)
+  content_checksum = CharField(null=True, index=True)
+
+
+class Image(BaseModel):
+  docker_image_id = CharField(index=True)
+  repository = ForeignKeyField(Repository)
+  ancestors = CharField(index=True, default='/', max_length=64535, null=True)
+  storage = ForeignKeyField(ImageStorage, index=True, null=True)
+  created = DateTimeField(null=True)
+  comment = TextField(null=True)
+  command = TextField(null=True)
+  aggregate_size = BigIntegerField(null=True)
+  v1_json_metadata = TextField(null=True)
+  v1_checksum = CharField(null=True)
+
+
+def backfill_checksums():
+  """ Copies checksums from image storages to their images. """
+  logger.debug('Image v1 checksum backfill: Began execution')
+  def batch_query():
+    return (Image
+            .select(Image.id)
+            .join(ImageStorage)
+            .where(Image.v1_checksum >> None, ImageStorage.uploading == False,
+                   ~(ImageStorage.checksum >> None)))
+
+  for candidate_image in yield_random_entries(batch_query, 10000, 0.1):
+    logger.debug('Computing content checksum for storage: %s', candidate_image.id)
+
+    with app.config['DB_TRANSACTION_FACTORY'](db):
+      try:
+        image = db_for_update(Image
+                              .select(Image, ImageStorage)
+                              .join(ImageStorage)
+                              .where(Image.id == candidate_image.id)).get()
+
+        image.v1_checksum = image.storage.checksum
+        image.save()
+      except Image.DoesNotExist:
+        pass
+
+
+if __name__ == "__main__":
+  logging.basicConfig(level=logging.DEBUG)
+  logging.getLogger('peewee').setLevel(logging.CRITICAL)
+  backfill_checksums()