2016-02-24 21:01:27 +00:00
import unittest
import json
2016-02-25 20:58:42 +00:00
import os
2016-02-24 21:01:27 +00:00
from httmock import urlmatch , all_requests , HTTMock
from app import app , config_provider , storage , notification_queue
2016-03-17 16:59:27 +00:00
from endpoints . notificationevent import VulnerabilityFoundEvent
2016-02-24 21:01:27 +00:00
from initdb import setup_database_for_testing , finished_database_for_testing
from util . secscan . api import SecurityScannerAPI , AnalyzeLayerException
from util . secscan . analyzer import LayerAnalyzer
2016-02-25 20:58:42 +00:00
from util . secscan . notifier import process_notification_data
2016-02-24 21:01:27 +00:00
from data import model
2016-03-19 00:28:06 +00:00
from workers . security_notification_worker import SecurityNotificationWorker
2016-02-24 21:01:27 +00:00
ADMIN_ACCESS_USER = ' devtable '
SIMPLE_REPO = ' simple '
2016-03-19 00:28:06 +00:00
COMPLEX_REPO = ' complex '
2016-02-24 21:01:27 +00:00
_PORT_NUMBER = 5001
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers/(.+) ' )
def get_layer_failure_mock ( url , request ) :
return { ' status_code ' : 404 , ' content ' : json . dumps ( { ' Error ' : { ' Message ' : ' Unknown layer ' } } ) }
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers$ ' )
def analyze_layer_badrequest_mock ( url , request ) :
return { ' status_code ' : 400 , ' content ' : json . dumps ( { ' Error ' : { ' Message ' : ' Bad request ' } } ) }
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers$ ' )
def analyze_layer_internal_mock ( url , request ) :
return { ' status_code ' : 500 , ' content ' : json . dumps ( { ' Error ' : { ' Message ' : ' Internal server error ' } } ) }
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers$ ' )
def analyze_layer_failure_mock ( url , request ) :
return { ' status_code ' : 422 , ' content ' : json . dumps ( { ' Error ' : { ' Message ' : ' Bad layer ' } } ) }
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers$ ' )
def analyze_layer_success_mock ( url , request ) :
2016-03-09 16:45:28 +00:00
body_data = json . loads ( request . body )
if not ' Layer ' in body_data :
return { ' status_code ' : 400 , ' content ' : ' Missing body ' }
layer = body_data [ ' Layer ' ]
if not ' Path ' in layer :
return { ' status_code ' : 400 , ' content ' : ' Missing Path ' }
if not ' Name ' in layer :
return { ' status_code ' : 400 , ' content ' : ' Missing Name ' }
if not ' Format ' in layer :
return { ' status_code ' : 400 , ' content ' : ' Missing Format ' }
2016-02-24 21:01:27 +00:00
return { ' status_code ' : 201 , ' content ' : json . dumps ( {
" Layer " : {
" Name " : " 523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6 " ,
" Path " : " /mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar " ,
" ParentName " : " 140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2 " ,
" Format " : " Docker " ,
" IndexedByVersion " : 1
}
} ) }
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers/(.+) ' )
def get_layer_success_mock ( url , request ) :
vulnerabilities = [
{
" Name " : " CVE-2014-9471 " ,
" Namespace " : " debian:8 " ,
" Description " : " The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \" --date=TZ= \" 123 \" 345 \" @1 \" string to the touch or date command. " ,
" Link " : " https://security-tracker.debian.org/tracker/CVE-2014-9471 " ,
" Severity " : " Low " ,
" FixedBy " : " 9.23-5 "
}
]
features = [
{
" Name " : " coreutils " ,
" Namespace " : " debian:8 " ,
" Version " : " 8.23-4 " ,
" Vulnerabilities " : vulnerabilities ,
}
]
2016-02-25 20:58:42 +00:00
if not request . url . find ( ' vulnerabilities ' ) > 0 :
2016-02-24 21:01:27 +00:00
vulnerabilities = [ ]
2016-02-25 20:58:42 +00:00
if not request . url . find ( ' features ' ) > 0 :
2016-02-24 21:01:27 +00:00
features = [ ]
return json . dumps ( {
" Layer " : {
" Name " : " 17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52 " ,
" Namespace " : " debian:8 " ,
" ParentName " : " 140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2 " ,
" IndexedByVersion " : 1 ,
" Features " : features
}
} )
@all_requests
def response_content ( url , request ) :
return { ' status_code ' : 500 , ' content ' : ' Unknown endpoint ' }
class TestSecurityScanner ( unittest . TestCase ) :
def setUp ( self ) :
# Enable direct download in fake storage.
storage . put_content ( [ ' local_us ' ] , ' supports_direct_download ' , ' true ' )
# Setup the database with fake storage.
2016-02-25 20:58:42 +00:00
force_rebuild = os . environ . get ( ' SKIP_REBUILD ' ) != ' true '
setup_database_for_testing ( self , with_storage = True , force_rebuild = force_rebuild )
2016-02-24 21:01:27 +00:00
self . app = app . test_client ( )
self . ctx = app . test_request_context ( )
self . ctx . __enter__ ( )
self . api = SecurityScannerAPI ( app . config , config_provider , storage )
def tearDown ( self ) :
storage . put_content ( [ ' local_us ' ] , ' supports_direct_download ' , ' false ' )
finished_database_for_testing ( self )
self . ctx . __exit__ ( True , None , None )
def assertAnalyzed ( self , layer , isAnalyzed , engineVersion ) :
self . assertEquals ( isAnalyzed , layer . security_indexed )
self . assertEquals ( engineVersion , layer . security_indexed_engine )
# Ensure all parent layers are marked as analyzed.
parents = model . image . get_parent_images ( ADMIN_ACCESS_USER , SIMPLE_REPO , layer )
for parent in parents :
self . assertEquals ( isAnalyzed , parent . security_indexed )
self . assertEquals ( engineVersion , parent . security_indexed_engine )
def test_get_layer_success ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
with HTTMock ( get_layer_success_mock , response_content ) :
result = self . api . get_layer_data ( layer , include_vulnerabilities = True )
self . assertIsNotNone ( result )
self . assertEquals ( result [ ' Layer ' ] [ ' Name ' ] , ' 17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52 ' )
def test_get_layer_failure ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
with HTTMock ( get_layer_failure_mock , response_content ) :
result = self . api . get_layer_data ( layer , include_vulnerabilities = True )
self . assertIsNone ( result )
def test_analyze_layer_success ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertFalse ( layer . security_indexed )
self . assertEquals ( - 1 , layer . security_indexed_engine )
with HTTMock ( analyze_layer_success_mock , get_layer_success_mock , response_content ) :
analyzer = LayerAnalyzer ( app . config , self . api )
analyzer . analyze_recursively ( layer )
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertAnalyzed ( layer , True , 1 )
def test_analyze_layer_failure ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertFalse ( layer . security_indexed )
self . assertEquals ( - 1 , layer . security_indexed_engine )
with HTTMock ( analyze_layer_failure_mock , response_content ) :
analyzer = LayerAnalyzer ( app . config , self . api )
analyzer . analyze_recursively ( layer )
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertAnalyzed ( layer , False , 1 )
def test_analyze_layer_internal_error ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertFalse ( layer . security_indexed )
self . assertEquals ( - 1 , layer . security_indexed_engine )
with HTTMock ( analyze_layer_internal_mock , response_content ) :
analyzer = LayerAnalyzer ( app . config , self . api )
analyzer . analyze_recursively ( layer )
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertAnalyzed ( layer , False , - 1 )
def test_analyze_layer_bad_request ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertFalse ( layer . security_indexed )
self . assertEquals ( - 1 , layer . security_indexed_engine )
with HTTMock ( analyze_layer_badrequest_mock , response_content ) :
analyzer = LayerAnalyzer ( app . config , self . api )
try :
analyzer . analyze_recursively ( layer )
except AnalyzeLayerException :
return
self . fail ( ' Expected exception on bad request ' )
def test_analyze_layer_missing_storage ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertFalse ( layer . security_indexed )
self . assertEquals ( - 1 , layer . security_indexed_engine )
# Delete the storage for the layer.
path = model . storage . get_layer_path ( layer . storage )
locations = app . config [ ' DISTRIBUTED_STORAGE_PREFERENCE ' ]
storage . remove ( locations , path )
with HTTMock ( analyze_layer_success_mock , response_content ) :
analyzer = LayerAnalyzer ( app . config , self . api )
analyzer . analyze_recursively ( layer )
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertEquals ( False , layer . security_indexed )
self . assertEquals ( 1 , layer . security_indexed_engine )
def test_analyze_layer_success_events ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertFalse ( layer . security_indexed )
self . assertEquals ( - 1 , layer . security_indexed_engine )
# Ensure there are no existing events.
self . assertIsNone ( notification_queue . get ( ) )
# Add a repo event for the layer.
repo = model . repository . get_repository ( ADMIN_ACCESS_USER , SIMPLE_REPO )
model . notification . create_repo_notification ( repo , ' vulnerability_found ' , ' quay_notification ' , { } , { ' level ' : 100 } )
with HTTMock ( analyze_layer_success_mock , get_layer_success_mock , response_content ) :
analyzer = LayerAnalyzer ( app . config , self . api )
analyzer . analyze_recursively ( layer )
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
self . assertAnalyzed ( layer , True , 1 )
# Ensure an event was written for the tag.
queue_item = notification_queue . get ( )
self . assertIsNotNone ( queue_item )
body = json . loads ( queue_item . body )
self . assertEquals ( [ ' latest ' , ' prod ' ] , body [ ' event_data ' ] [ ' tags ' ] )
self . assertEquals ( ' CVE-2014-9471 ' , body [ ' event_data ' ] [ ' vulnerability ' ] [ ' id ' ] )
self . assertEquals ( ' Low ' , body [ ' event_data ' ] [ ' vulnerability ' ] [ ' priority ' ] )
self . assertTrue ( body [ ' event_data ' ] [ ' vulnerability ' ] [ ' has_fix ' ] )
2016-02-25 20:58:42 +00:00
def _get_notification_data ( self , new_layer_ids , old_layer_ids , new_severity = ' Low ' ) :
return {
" Name " : " ec45ec87-bfc8-4129-a1c3-d2b82622175a " ,
" Created " : " 1456247389 " ,
" Notified " : " 1456246708 " ,
" Limit " : 2 ,
" New " : {
" Vulnerability " : {
" Name " : " CVE-TEST " ,
" Namespace " : " debian:8 " ,
" Description " : " New CVE " ,
" Severity " : new_severity ,
" FixedIn " : [
{
" Name " : " grep " ,
" Namespace " : " debian:8 " ,
" Version " : " 2.25 "
}
]
} ,
" LayersIntroducingVulnerability " : new_layer_ids ,
} ,
" Old " : {
" Vulnerability " : {
" Name " : " CVE-TEST " ,
" Namespace " : " debian:8 " ,
" Description " : " New CVE " ,
" Severity " : " Low " ,
" FixedIn " : [ ]
} ,
" LayersIntroducingVulnerability " : old_layer_ids ,
}
}
2016-04-22 17:05:34 +00:00
def _get_delete_notification_data ( self , old_layer_ids ) :
return {
" Name " : " ec45ec87-bfc8-4129-a1c3-d2b82622175a " ,
" Created " : " 1456247389 " ,
" Notified " : " 1456246708 " ,
" Limit " : 2 ,
" Old " : {
" Vulnerability " : {
" Name " : " CVE-TEST " ,
" Namespace " : " debian:8 " ,
" Description " : " New CVE " ,
" Severity " : " Low " ,
" FixedIn " : [ ]
} ,
" LayersIntroducingVulnerability " : old_layer_ids ,
}
}
2016-02-25 20:58:42 +00:00
def test_notification_new_layers_not_vulnerable ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
layer_id = ' %s . %s ' % ( layer . docker_image_id , layer . storage . uuid )
# Add a repo event for the layer.
repo = model . repository . get_repository ( ADMIN_ACCESS_USER , SIMPLE_REPO )
model . notification . create_repo_notification ( repo , ' vulnerability_found ' , ' quay_notification ' , { } , { ' level ' : 100 } )
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers/(.+) ' )
def get_matching_layer_not_vulnerable ( url , request ) :
return json . dumps ( {
" Layer " : {
" Name " : layer_id ,
" Namespace " : " debian:8 " ,
" IndexedByVersion " : 1 ,
" Features " : [
{
" Name " : " coreutils " ,
" Namespace " : " debian:8 " ,
" Version " : " 8.23-4 " ,
" Vulnerabilities " : [ ] , # Report not vulnerable.
}
]
}
} )
# Ensure that there are no event queue items for the layer.
self . assertIsNone ( notification_queue . get ( ) )
# Fire off the notification processing.
with HTTMock ( get_matching_layer_not_vulnerable , response_content ) :
notification_data = self . _get_notification_data ( [ layer_id ] , [ ] )
self . assertTrue ( process_notification_data ( notification_data ) )
# Ensure that there are no event queue items for the layer.
self . assertIsNone ( notification_queue . get ( ) )
2016-04-22 17:05:34 +00:00
def test_notification_delete ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
layer_id = ' %s . %s ' % ( layer . docker_image_id , layer . storage . uuid )
# Add a repo event for the layer.
repo = model . repository . get_repository ( ADMIN_ACCESS_USER , SIMPLE_REPO )
model . notification . create_repo_notification ( repo , ' vulnerability_found ' , ' quay_notification ' , { } , { ' level ' : 100 } )
# Ensure that there are no event queue items for the layer.
self . assertIsNone ( notification_queue . get ( ) )
# Fire off the notification processing.
notification_data = self . _get_delete_notification_data ( [ layer_id ] )
self . assertTrue ( process_notification_data ( notification_data ) )
# Ensure that there are no event queue items for the layer.
self . assertIsNone ( notification_queue . get ( ) )
2016-02-25 20:58:42 +00:00
def test_notification_new_layers ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
layer_id = ' %s . %s ' % ( layer . docker_image_id , layer . storage . uuid )
# Add a repo event for the layer.
repo = model . repository . get_repository ( ADMIN_ACCESS_USER , SIMPLE_REPO )
model . notification . create_repo_notification ( repo , ' vulnerability_found ' , ' quay_notification ' , { } , { ' level ' : 100 } )
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers/(.+) ' )
def get_matching_layer_vulnerable ( url , request ) :
return json . dumps ( {
" Layer " : {
" Name " : layer_id ,
" Namespace " : " debian:8 " ,
" IndexedByVersion " : 1 ,
" Features " : [
{
" Name " : " coreutils " ,
" Namespace " : " debian:8 " ,
" Version " : " 8.23-4 " ,
" Vulnerabilities " : [
{
" Name " : " CVE-TEST " ,
" Namespace " : " debian:8 " ,
" Severity " : " Low " ,
}
] ,
}
]
}
} )
# Ensure that there are no event queue items for the layer.
self . assertIsNone ( notification_queue . get ( ) )
# Fire off the notification processing.
with HTTMock ( get_matching_layer_vulnerable , response_content ) :
notification_data = self . _get_notification_data ( [ layer_id ] , [ ] )
self . assertTrue ( process_notification_data ( notification_data ) )
# Ensure an event was written for the tag.
queue_item = notification_queue . get ( )
self . assertIsNotNone ( queue_item )
body = json . loads ( queue_item . body )
self . assertEquals ( [ ' prod ' , ' latest ' ] , body [ ' event_data ' ] [ ' tags ' ] )
self . assertEquals ( ' CVE-TEST ' , body [ ' event_data ' ] [ ' vulnerability ' ] [ ' id ' ] )
self . assertEquals ( ' Low ' , body [ ' event_data ' ] [ ' vulnerability ' ] [ ' priority ' ] )
self . assertTrue ( body [ ' event_data ' ] [ ' vulnerability ' ] [ ' has_fix ' ] )
def test_notification_no_new_layers ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
layer_id = ' %s . %s ' % ( layer . docker_image_id , layer . storage . uuid )
# Add a repo event for the layer.
repo = model . repository . get_repository ( ADMIN_ACCESS_USER , SIMPLE_REPO )
model . notification . create_repo_notification ( repo , ' vulnerability_found ' , ' quay_notification ' , { } , { ' level ' : 100 } )
# Ensure that there are no event queue items for the layer.
self . assertIsNone ( notification_queue . get ( ) )
# Fire off the notification processing.
with HTTMock ( response_content ) :
notification_data = self . _get_notification_data ( [ layer_id ] , [ layer_id ] )
self . assertTrue ( process_notification_data ( notification_data ) )
# Ensure that there are no event queue items for the layer.
self . assertIsNone ( notification_queue . get ( ) )
def test_notification_no_new_layers_increased_severity ( self ) :
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
layer_id = ' %s . %s ' % ( layer . docker_image_id , layer . storage . uuid )
# Add a repo event for the layer.
repo = model . repository . get_repository ( ADMIN_ACCESS_USER , SIMPLE_REPO )
2016-03-17 16:59:27 +00:00
notification = model . notification . create_repo_notification ( repo , ' vulnerability_found ' , ' quay_notification ' , { } , { ' level ' : 100 } )
2016-02-25 20:58:42 +00:00
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/layers/(.+) ' )
def get_matching_layer_vulnerable ( url , request ) :
return json . dumps ( {
" Layer " : {
" Name " : layer_id ,
" Namespace " : " debian:8 " ,
" IndexedByVersion " : 1 ,
" Features " : [
{
" Name " : " coreutils " ,
" Namespace " : " debian:8 " ,
" Version " : " 8.23-4 " ,
" Vulnerabilities " : [
{
" Name " : " CVE-TEST " ,
" Namespace " : " debian:8 " ,
" Severity " : " Low " ,
}
] ,
}
]
}
} )
# Ensure that there are no event queue items for the layer.
self . assertIsNone ( notification_queue . get ( ) )
# Fire off the notification processing.
with HTTMock ( get_matching_layer_vulnerable , response_content ) :
2016-03-17 16:59:27 +00:00
notification_data = self . _get_notification_data ( [ layer_id ] , [ layer_id ] , new_severity = ' Critical ' )
2016-02-25 20:58:42 +00:00
self . assertTrue ( process_notification_data ( notification_data ) )
# Ensure an event was written for the tag.
queue_item = notification_queue . get ( )
self . assertIsNotNone ( queue_item )
body = json . loads ( queue_item . body )
self . assertEquals ( [ ' prod ' , ' latest ' ] , body [ ' event_data ' ] [ ' tags ' ] )
self . assertEquals ( ' CVE-TEST ' , body [ ' event_data ' ] [ ' vulnerability ' ] [ ' id ' ] )
2016-03-17 16:59:27 +00:00
self . assertEquals ( ' Critical ' , body [ ' event_data ' ] [ ' vulnerability ' ] [ ' priority ' ] )
2016-02-25 20:58:42 +00:00
self . assertTrue ( body [ ' event_data ' ] [ ' vulnerability ' ] [ ' has_fix ' ] )
2016-03-17 16:59:27 +00:00
# Verify that an event would be raised.
event_data = body [ ' event_data ' ]
self . assertTrue ( VulnerabilityFoundEvent ( ) . should_perform ( event_data , notification ) )
# Create another notification with a matching level and verify it will be raised.
notification = model . notification . create_repo_notification ( repo , ' vulnerability_found ' , ' quay_notification ' , { } , { ' level ' : 1 } )
self . assertTrue ( VulnerabilityFoundEvent ( ) . should_perform ( event_data , notification ) )
# Create another notification with a higher level and verify it won't be raised.
notification = model . notification . create_repo_notification ( repo , ' vulnerability_found ' , ' quay_notification ' , { } , { ' level ' : 0 } )
self . assertFalse ( VulnerabilityFoundEvent ( ) . should_perform ( event_data , notification ) )
2016-02-25 20:58:42 +00:00
2016-03-19 00:28:06 +00:00
def test_notification_worker ( self ) :
pages_called = [ ]
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/notifications/somenotification$ ' , method = ' DELETE ' )
def delete_notification ( url , request ) :
pages_called . append ( ' DELETE ' )
return { ' status_code ' : 201 , ' content ' : ' ' }
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/notifications/somenotification$ ' , method = ' GET ' )
def get_notification ( url , request ) :
if url . query . find ( ' page=nextpage ' ) > = 0 :
pages_called . append ( ' GET-2 ' )
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , COMPLEX_REPO , ' prod ' )
layer_id = ' %s . %s ' % ( layer . docker_image_id , layer . storage . uuid )
data = {
' Notification ' : self . _get_notification_data ( [ layer_id ] , [ layer_id ] ) ,
}
return json . dumps ( data )
else :
pages_called . append ( ' GET-1 ' )
layer = model . tag . get_tag_image ( ADMIN_ACCESS_USER , SIMPLE_REPO , ' latest ' )
layer_id = ' %s . %s ' % ( layer . docker_image_id , layer . storage . uuid )
notification_data = self . _get_notification_data ( [ layer_id ] , [ layer_id ] )
notification_data [ ' NextPage ' ] = ' nextpage '
data = {
' Notification ' : notification_data ,
}
return json . dumps ( data )
@urlmatch ( netloc = r ' (.* \ .)?mockclairservice ' , path = r ' /v1/notifications/(.*) ' )
def unknown_notification ( url , request ) :
return { ' status_code ' : 404 , ' content ' : ' Unknown notification ' }
# Test with an unknown notification.
with HTTMock ( get_notification , unknown_notification ) :
worker = SecurityNotificationWorker ( None )
self . assertFalse ( worker . process_queue_item ( {
' Name ' : ' unknownnotification '
} ) )
# Test with a known notification with pages.
data = {
' Name ' : ' somenotification '
}
with HTTMock ( get_notification , delete_notification , unknown_notification ) :
worker = SecurityNotificationWorker ( None )
self . assertTrue ( worker . process_queue_item ( data ) )
self . assertEquals ( [ ' GET-1 ' , ' GET-2 ' , ' DELETE ' ] , pages_called )
2016-02-24 21:01:27 +00:00
if __name__ == ' __main__ ' :
unittest . main ( )