diff --git a/data/model/storage.py b/data/model/storage.py
index 5acb8ce39..8cdcf42f9 100644
--- a/data/model/storage.py
+++ b/data/model/storage.py
@@ -46,6 +46,31 @@ def add_storage_placement(storage, location_name):
     pass
 
 
+def _orphaned_storage_query(candidate_ids):
+  """ Returns the subset of the candidate ImageStorage IDs representing storages that are no
+      longer referenced by images.
+  """
+  # Issue a union query to find all storages that are still referenced by a candidate storage. This
+  # is much faster than the group_by and having call we used to use here.
+  nonorphaned_queries = []
+  for counter, candidate_id in enumerate(candidate_ids):
+    query_alias = 'q{0}'.format(counter)
+    storage_subq = (ImageStorage
+                    .select(ImageStorage.id)
+                    .join(Image)
+                    .where(ImageStorage.id == candidate_id)
+                    .limit(1)
+                    .alias(query_alias))
+
+    nonorphaned_queries.append(ImageStorage
+                               .select(SQL('*'))
+                               .from_(storage_subq))
+
+  # Build the set of storages that are missing. These storages are orphaned.
+  nonorphaned_storage_ids = {storage.id for storage in _reduce_as_tree(nonorphaned_queries)}
+  return list(candidate_ids - nonorphaned_storage_ids)
+
+
 def garbage_collect_storage(storage_id_whitelist):
   if len(storage_id_whitelist) == 0:
     return
@@ -55,27 +80,21 @@ def garbage_collect_storage(storage_id_whitelist):
              get_layer_path(placement.storage))
             for placement in placements_query}
 
-  def orphaned_storage_query(select_base_query, candidates, group_by):
-    return (select_base_query
-            .switch(ImageStorage)
-            .join(Image, JOIN_LEFT_OUTER)
-            .where(ImageStorage.id << list(candidates))
-            .group_by(*group_by)
-            .having(fn.Count(Image.id) == 0))
-
   # Note: Both of these deletes must occur in the same transaction (unfortunately) because a
   # storage without any placement is invalid, and a placement cannot exist without a storage.
   # TODO(jake): We might want to allow for null storages on placements, which would allow us to
   # delete the storages, then delete the placements in a non-transaction.
   logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
   with db_transaction():
-    # Track all of the data that should be removed from blob storage
-    placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement
-                                                       .select(ImageStoragePlacement,
-                                                               ImageStorage)
-                                                       .join(ImageStorage),
-                                                       storage_id_whitelist,
-                                                       (ImageStorage.id, ImageStoragePlacement.id)))
+    orphaned_storage_ids = _orphaned_storage_query(storage_id_whitelist)
+    if len(orphaned_storage_ids) == 0:
+      # Nothing to GC.
+      return
+
+    placements_to_remove = list(ImageStoragePlacement
+                                .select()
+                                .join(ImageStorage)
+                                .where(ImageStorage.id << orphaned_storage_ids))
 
     paths_to_remove = placements_query_to_paths_set(placements_to_remove)
 
@@ -89,28 +108,23 @@ def garbage_collect_storage(storage_id_whitelist):
       logger.debug('Removed %s image storage placements', placements_removed)
 
     # Remove all orphaned storages
-    # The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence
-    orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
-                                                    storage_id_whitelist,
-                                                    (ImageStorage.id,)).alias('osq'))
-    if len(orphaned_storages) > 0:
-      torrents_removed = (TorrentInfo
-                          .delete()
-                          .where(TorrentInfo.storage << orphaned_storages)
-                          .execute())
-      logger.debug('Removed %s torrent info records', torrents_removed)
+    torrents_removed = (TorrentInfo
+                        .delete()
+                        .where(TorrentInfo.storage << orphaned_storage_ids)
+                        .execute())
+    logger.debug('Removed %s torrent info records', torrents_removed)
 
-      signatures_removed = (ImageStorageSignature
-                            .delete()
-                            .where(ImageStorageSignature.storage << orphaned_storages)
-                            .execute())
-      logger.debug('Removed %s image storage signatures', signatures_removed)
-
-      storages_removed = (ImageStorage
+    signatures_removed = (ImageStorageSignature
                           .delete()
-                          .where(ImageStorage.id << orphaned_storages)
+                          .where(ImageStorageSignature.storage << orphaned_storage_ids)
                           .execute())
-      logger.debug('Removed %s image storage records', storages_removed)
+    logger.debug('Removed %s image storage signatures', signatures_removed)
+
+    storages_removed = (ImageStorage
+                        .delete()
+                        .where(ImageStorage.id << orphaned_storage_ids)
+                        .execute())
+    logger.debug('Removed %s image storage records', storages_removed)
 
   # We are going to make the conscious decision to not delete image storage blobs inside
   # transactions.
diff --git a/test/test_gc.py b/test/test_gc.py
index 871b4a8a7..2c438c1bc 100644
--- a/test/test_gc.py
+++ b/test/test_gc.py
@@ -1,9 +1,12 @@
 import unittest
 import time
 
+from peewee import fn, JOIN_LEFT_OUTER
+
 from app import app, storage
 from initdb import setup_database_for_testing, finished_database_for_testing
 from data import model, database
+from data.database import Image, ImageStorage, DerivedStorageForImage
 from endpoints.v2.manifest import _generate_and_store_manifest
 
 
@@ -12,6 +15,29 @@ PUBLIC_USER = 'public'
 
 REPO = 'somerepo'
 
+
+class assert_no_new_dangling_storages(object):
+  """ Specialized assertion for ensuring that GC cleans up all dangling storages.
+  """
+  def __init__(self):
+    self.existing_count = 0
+
+  def _get_dangling_count(self):
+    storage_ids = set([current.id for current in ImageStorage.select()])
+    referneced_by_image = set([image.storage_id for image in Image.select()])
+    referenced_by_derived = set([derived.derivative_id for derived in DerivedStorageForImage.select()])
+
+    return len(storage_ids - referneced_by_image - referenced_by_derived)
+
+  def __enter__(self):
+    self.existing_count = self._get_dangling_count()
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    updated_count = self._get_dangling_count()
+    assert updated_count == self.existing_count
+
+
 class TestGarbageCollection(unittest.TestCase):
   @staticmethod
   def _set_tag_expiration_policy(namespace, expiration_s):
@@ -122,6 +148,7 @@ class TestGarbageCollection(unittest.TestCase):
 
       self.fail('Expected image %s to be deleted' % docker_image_id)
 
+
   def test_has_garbage(self):
     """ Remove all existing repositories, then add one without garbage, check, then add one with
         garbage, and check again.
@@ -167,144 +194,164 @@ class TestGarbageCollection(unittest.TestCase):
   def test_one_tag(self):
     """ Create a repository with a single tag, then remove that tag and verify that the repository
         is now empty. """
-    repository = self.createRepository(latest=['i1', 'i2', 'i3'])
-    self.deleteTag(repository, 'latest')
-    self.assertDeleted(repository, 'i1', 'i2', 'i3')
+    with assert_no_new_dangling_storages():
+      repository = self.createRepository(latest=['i1', 'i2', 'i3'])
+      self.deleteTag(repository, 'latest')
+      self.assertDeleted(repository, 'i1', 'i2', 'i3')
+
 
   def test_two_tags_unshared_images(self):
     """ Repository has two tags with no shared images between them. """
-    repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2'])
-    self.deleteTag(repository, 'latest')
-    self.assertDeleted(repository, 'i1', 'i2', 'i3')
-    self.assertNotDeleted(repository, 'f1', 'f2')
+    with assert_no_new_dangling_storages():
+      repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2'])
+      self.deleteTag(repository, 'latest')
+      self.assertDeleted(repository, 'i1', 'i2', 'i3')
+      self.assertNotDeleted(repository, 'f1', 'f2')
+
 
   def test_two_tags_shared_images(self):
     """ Repository has two tags with shared images. Deleting the tag should only remove the
         unshared images.
     """
-    repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
-    self.deleteTag(repository, 'latest')
-    self.assertDeleted(repository, 'i2', 'i3')
-    self.assertNotDeleted(repository, 'i1', 'f1')
+    with assert_no_new_dangling_storages():
+      repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
+      self.deleteTag(repository, 'latest')
+      self.assertDeleted(repository, 'i2', 'i3')
+      self.assertNotDeleted(repository, 'i1', 'f1')
+
 
   def test_unrelated_repositories(self):
     """ Two repositories with different images. Removing the tag from one leaves the other's
         images intact.
     """
-    repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
-    repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2')
+    with assert_no_new_dangling_storages():
+      repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
+      repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2')
 
-    self.deleteTag(repository1, 'latest')
+      self.deleteTag(repository1, 'latest')
+
+      self.assertDeleted(repository1, 'i1', 'i2', 'i3')
+      self.assertNotDeleted(repository2, 'j1', 'j2', 'j3')
 
-    self.assertDeleted(repository1, 'i1', 'i2', 'i3')
-    self.assertNotDeleted(repository2, 'j1', 'j2', 'j3')
 
   def test_related_repositories(self):
     """ Two repositories with shared images. Removing the tag from one leaves the other's
         images intact.
     """
-    repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
-    repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2')
+    with assert_no_new_dangling_storages():
+      repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
+      repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2')
 
-    self.deleteTag(repository1, 'latest')
+      self.deleteTag(repository1, 'latest')
+
+      self.assertDeleted(repository1, 'i3')
+      self.assertNotDeleted(repository2, 'i1', 'i2', 'j1')
 
-    self.assertDeleted(repository1, 'i3')
-    self.assertNotDeleted(repository2, 'i1', 'i2', 'j1')
 
   def test_inaccessible_repositories(self):
     """ Two repositories under different namespaces should result in the images being deleted
         but not completely removed from the database.
     """
-    repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3'])
-    repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3'])
+    with assert_no_new_dangling_storages():
+      repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3'])
+      repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3'])
 
-    self.deleteTag(repository1, 'latest')
-    self.assertDeleted(repository1, 'i1', 'i2', 'i3')
-    self.assertNotDeleted(repository2, 'i1', 'i2', 'i3')
+      self.deleteTag(repository1, 'latest')
+      self.assertDeleted(repository1, 'i1', 'i2', 'i3')
+      self.assertNotDeleted(repository2, 'i1', 'i2', 'i3')
 
 
   def test_multiple_shared_images(self):
     """ Repository has multiple tags with shared images. Selectively deleting the tags, and
         verifying at each step.
     """
-    repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
-                                       third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
+    with assert_no_new_dangling_storages():
+      repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
+                                         third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
 
-    # Delete tag other. Should delete f2, since it is not shared.
-    self.deleteTag(repository, 'other')
-    self.assertDeleted(repository, 'f2')
-    self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1')
+      # Delete tag other. Should delete f2, since it is not shared.
+      self.deleteTag(repository, 'other')
+      self.assertDeleted(repository, 'f2')
+      self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1')
 
-    # Move tag fourth to i3. This should remove f1 since it is no longer referenced.
-    self.moveTag(repository, 'fourth', 'i3')
-    self.assertDeleted(repository, 'f1')
-    self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
+      # Move tag fourth to i3. This should remove f1 since it is no longer referenced.
+      self.moveTag(repository, 'fourth', 'i3')
+      self.assertDeleted(repository, 'f1')
+      self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
 
-    # Delete tag 'latest'. This should do nothing since fourth is on the same branch.
-    self.deleteTag(repository, 'latest')
-    self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
+      # Delete tag 'latest'. This should do nothing since fourth is on the same branch.
+      self.deleteTag(repository, 'latest')
+      self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
 
-    # Delete tag 'third'. This should remove t1->t3.
-    self.deleteTag(repository, 'third')
-    self.assertDeleted(repository, 't1', 't2', 't3')
-    self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
+      # Delete tag 'third'. This should remove t1->t3.
+      self.deleteTag(repository, 'third')
+      self.assertDeleted(repository, 't1', 't2', 't3')
+      self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
 
-    # Add tag to i1.
-    self.moveTag(repository, 'newtag', 'i1')
-    self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
+      # Add tag to i1.
+      self.moveTag(repository, 'newtag', 'i1')
+      self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
 
-    # Delete tag 'fourth'. This should remove i2 and i3.
-    self.deleteTag(repository, 'fourth')
-    self.assertDeleted(repository, 'i2', 'i3')
-    self.assertNotDeleted(repository, 'i1')
+      # Delete tag 'fourth'. This should remove i2 and i3.
+      self.deleteTag(repository, 'fourth')
+      self.assertDeleted(repository, 'i2', 'i3')
+      self.assertNotDeleted(repository, 'i1')
+
+      # Delete tag 'newtag'. This should remove the remaining image.
+      self.deleteTag(repository, 'newtag')
+      self.assertDeleted(repository, 'i1')
 
-    # Delete tag 'newtag'. This should remove the remaining image.
-    self.deleteTag(repository, 'newtag')
-    self.assertDeleted(repository, 'i1')
 
   def test_empty_gc(self):
-    repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
-                                       third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
+    with assert_no_new_dangling_storages():
+      repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
+                                         third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
+
+      self.gcNow(repository)
+      self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1', 'f2')
 
-    self.gcNow(repository)
-    self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1', 'f2')
 
   def test_time_machine_no_gc(self):
     """ Repository has two tags with shared images. Deleting the tag should not remove any images
     """
-    repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
-    self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24)
+    with assert_no_new_dangling_storages():
+      repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
+      self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24)
+
+      self.deleteTag(repository, 'latest')
+      self.assertNotDeleted(repository, 'i2', 'i3')
+      self.assertNotDeleted(repository, 'i1', 'f1')
 
-    self.deleteTag(repository, 'latest')
-    self.assertNotDeleted(repository, 'i2', 'i3')
-    self.assertNotDeleted(repository, 'i1', 'f1')
 
   def test_time_machine_gc(self):
     """ Repository has two tags with shared images. Deleting the second tag should cause the images
         for the first deleted tag to gc.
     """
-    repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
+    with assert_no_new_dangling_storages():
+      repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
 
-    self._set_tag_expiration_policy(repository.namespace_user.username, 1)
+      self._set_tag_expiration_policy(repository.namespace_user.username, 1)
 
-    self.deleteTag(repository, 'latest')
-    self.assertNotDeleted(repository, 'i2', 'i3')
-    self.assertNotDeleted(repository, 'i1', 'f1')
+      self.deleteTag(repository, 'latest')
+      self.assertNotDeleted(repository, 'i2', 'i3')
+      self.assertNotDeleted(repository, 'i1', 'f1')
 
-    time.sleep(2)
+      time.sleep(2)
+
+      self.deleteTag(repository, 'other')  # This will cause the images associated with latest to gc
+      self.assertDeleted(repository, 'i2', 'i3')
+      self.assertNotDeleted(repository, 'i1', 'f1')
 
-    self.deleteTag(repository, 'other')  # This will cause the images associated with latest to gc
-    self.assertDeleted(repository, 'i2', 'i3')
-    self.assertNotDeleted(repository, 'i1', 'f1')
 
   def test_manifest_gc(self):
-    repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
-    _generate_and_store_manifest(ADMIN_ACCESS_USER, REPO, 'latest')
+    with assert_no_new_dangling_storages():
+      repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
+      _generate_and_store_manifest(ADMIN_ACCESS_USER, REPO, 'latest')
 
-    self._set_tag_expiration_policy(repository.namespace_user.username, 0)
+      self._set_tag_expiration_policy(repository.namespace_user.username, 0)
 
-    self.deleteTag(repository, 'latest')
-    self.assertDeleted(repository, 'i2', 'i3')
+      self.deleteTag(repository, 'latest')
+      self.assertDeleted(repository, 'i2', 'i3')
 
 
 if __name__ == '__main__':