closes #2224: re-vendor the latest Azure Storage SDK for better performance

Signed-off-by: Yu Wang <yuwa@microsoft.com>
This commit is contained in:
Yu Wang (UC) 2017-04-13 18:05:38 -07:00 committed by Yu Wang
parent c3e06c6069
commit ac05d143d8
58 changed files with 7158 additions and 1674 deletions

View file

@ -15,12 +15,6 @@ const (
userDefinedMetadataHeaderPrefix = "X-Ms-Meta-"
)
// QueueServiceClient contains operations for Microsoft Azure Queue Storage
// Service.
type QueueServiceClient struct {
client Client
}
func pathForQueue(queue string) string { return fmt.Sprintf("/%s", queue) }
func pathForQueueMessages(queue string) string { return fmt.Sprintf("/%s/messages", queue) }
func pathForMessage(queue, name string) string { return fmt.Sprintf("/%s/messages/%s", queue, name) }
@ -151,16 +145,17 @@ type QueueMetadataResponse struct {
// See https://msdn.microsoft.com/en-us/library/azure/dd179348.aspx
func (c QueueServiceClient) SetMetadata(name string, metadata map[string]string) error {
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
metadata = c.client.protectUserAgent(metadata)
headers := c.client.getStandardHeaders()
for k, v := range metadata {
headers[userDefinedMetadataHeaderPrefix+k] = v
}
resp, err := c.client.exec("PUT", uri, headers, nil)
resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
if err != nil {
return err
}
defer resp.body.Close()
defer readAndCloseBody(resp.body)
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
}
@ -179,11 +174,11 @@ func (c QueueServiceClient) GetMetadata(name string) (QueueMetadataResponse, err
qm.UserDefinedMetadata = make(map[string]string)
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
headers := c.client.getStandardHeaders()
resp, err := c.client.exec("GET", uri, headers, nil)
resp, err := c.client.exec(http.MethodGet, uri, headers, nil, c.auth)
if err != nil {
return qm, err
}
defer resp.body.Close()
defer readAndCloseBody(resp.body)
for k, v := range resp.headers {
if len(v) != 1 {
@ -212,11 +207,11 @@ func (c QueueServiceClient) GetMetadata(name string) (QueueMetadataResponse, err
func (c QueueServiceClient) CreateQueue(name string) error {
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
headers := c.client.getStandardHeaders()
resp, err := c.client.exec("PUT", uri, headers, nil)
resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
if err != nil {
return err
}
defer resp.body.Close()
defer readAndCloseBody(resp.body)
return checkRespCode(resp.statusCode, []int{http.StatusCreated})
}
@ -225,18 +220,18 @@ func (c QueueServiceClient) CreateQueue(name string) error {
// See https://msdn.microsoft.com/en-us/library/azure/dd179436.aspx
func (c QueueServiceClient) DeleteQueue(name string) error {
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
resp, err := c.client.exec("DELETE", uri, c.client.getStandardHeaders(), nil)
resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
if err != nil {
return err
}
defer resp.body.Close()
defer readAndCloseBody(resp.body)
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
}
// QueueExists returns true if a queue with given name exists.
func (c QueueServiceClient) QueueExists(name string) (bool, error) {
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": {"metadata"}})
resp, err := c.client.exec("GET", uri, c.client.getStandardHeaders(), nil)
resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
if resp != nil && (resp.statusCode == http.StatusOK || resp.statusCode == http.StatusNotFound) {
return resp.statusCode == http.StatusOK, nil
}
@ -256,11 +251,11 @@ func (c QueueServiceClient) PutMessage(queue string, message string, params PutM
}
headers := c.client.getStandardHeaders()
headers["Content-Length"] = strconv.Itoa(nn)
resp, err := c.client.exec("POST", uri, headers, body)
resp, err := c.client.exec(http.MethodPost, uri, headers, body, c.auth)
if err != nil {
return err
}
defer resp.body.Close()
defer readAndCloseBody(resp.body)
return checkRespCode(resp.statusCode, []int{http.StatusCreated})
}
@ -269,11 +264,11 @@ func (c QueueServiceClient) PutMessage(queue string, message string, params PutM
// See https://msdn.microsoft.com/en-us/library/azure/dd179454.aspx
func (c QueueServiceClient) ClearMessages(queue string) error {
uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), url.Values{})
resp, err := c.client.exec("DELETE", uri, c.client.getStandardHeaders(), nil)
resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
if err != nil {
return err
}
defer resp.body.Close()
defer readAndCloseBody(resp.body)
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
}
@ -284,7 +279,7 @@ func (c QueueServiceClient) ClearMessages(queue string) error {
func (c QueueServiceClient) GetMessages(queue string, params GetMessagesParameters) (GetMessagesResponse, error) {
var r GetMessagesResponse
uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
resp, err := c.client.exec("GET", uri, c.client.getStandardHeaders(), nil)
resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
if err != nil {
return r, err
}
@ -300,7 +295,7 @@ func (c QueueServiceClient) GetMessages(queue string, params GetMessagesParamete
func (c QueueServiceClient) PeekMessages(queue string, params PeekMessagesParameters) (PeekMessagesResponse, error) {
var r PeekMessagesResponse
uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
resp, err := c.client.exec("GET", uri, c.client.getStandardHeaders(), nil)
resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
if err != nil {
return r, err
}
@ -315,11 +310,11 @@ func (c QueueServiceClient) PeekMessages(queue string, params PeekMessagesParame
func (c QueueServiceClient) DeleteMessage(queue, messageID, popReceipt string) error {
uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), url.Values{
"popreceipt": {popReceipt}})
resp, err := c.client.exec("DELETE", uri, c.client.getStandardHeaders(), nil)
resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
if err != nil {
return err
}
defer resp.body.Close()
defer readAndCloseBody(resp.body)
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
}
@ -335,10 +330,10 @@ func (c QueueServiceClient) UpdateMessage(queue string, messageID string, messag
}
headers := c.client.getStandardHeaders()
headers["Content-Length"] = fmt.Sprintf("%d", nn)
resp, err := c.client.exec("PUT", uri, headers, body)
resp, err := c.client.exec(http.MethodPut, uri, headers, body, c.auth)
if err != nil {
return err
}
defer resp.body.Close()
defer readAndCloseBody(resp.body)
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
}