From 24320bb78362e683d95610bc0ca2c67e4d552ce3 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 2 Sep 2019 17:23:12 -0700 Subject: [PATCH] Re-implement select_object_content implementation This change fixes multiple issues - calculates proper CRC for the entire message as per spec - handles unicode boundaries properly for special delimiters - handle zero payload 'Cont' event messages --- minio/helpers.py | 6 +- minio/select_object_reader.py | 261 +++++++++++++++------------------- 2 files changed, 118 insertions(+), 149 deletions(-) diff --git a/minio/helpers.py b/minio/helpers.py index 8112d5f2..8337b68c 100644 --- a/minio/helpers.py +++ b/minio/helpers.py @@ -56,14 +56,14 @@ DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB -# Select Object Content -READ_SIZE_SELECT = 32 * 1024 # Buffer size SQL = 'SQL' # Value for ExpressionType EVENT_RECORDS = 'Records' # Event Type is Records EVENT_PROGRESS = 'Progress' # Event Type Progress EVENT_STATS = 'Stats' # Event Type Stats -EVENT = 'event' # Message Type is event +EVENT_CONT = 'Cont' # Event Type continue EVENT_END = 'End' # Event Type is End +EVENT_CONTENT_TYPE = "text/xml" # Event content xml type +EVENT = 'event' # Message Type is event ERROR = 'error' # Message Type is error _VALID_BUCKETNAME_REGEX = re.compile('^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$') diff --git a/minio/select_object_reader.py b/minio/select_object_reader.py index 64ccc57a..5391119d 100644 --- a/minio/select_object_reader.py +++ b/minio/select_object_reader.py @@ -23,16 +23,22 @@ from .error import InvalidXMLError from xml.etree.cElementTree import ParseError -from .helpers import (READ_SIZE_SELECT, EVENT_RECORDS, - EVENT_PROGRESS, EVENT_STATS, EVENT, EVENT_END, ERROR) +from .helpers import (EVENT_RECORDS, EVENT_PROGRESS, + EVENT_STATS, EVENT_CONT, + EVENT, EVENT_CONTENT_TYPE, + EVENT_END, ERROR) +class SelectMessageError(Exception): + ''' + Raised in case of message type 'error' + ''' + class CRCValidationError(Exception): ''' Raised in case of CRC mismatch ''' - def calculate_crc(value): ''' Returns the CRC using crc32 @@ -66,7 +72,7 @@ class SelectObjectReader(object): """ def __init__(self, response): self.response = response - self.remaining_bytes = bytearray() + self.remaining_bytes = bytes() self.stat = {} self.prog = {} @@ -76,10 +82,6 @@ def readable(self): def writeable(self): return False - @property - def closed(self): - return self.response.isclosed() - def close(self): self.response.close() @@ -94,82 +96,118 @@ def __extract_message(self): Process the response sent from server. https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html """ - rec = bytearray() - read_buffer = READ_SIZE_SELECT - # Messages read in chunks of read_buffer bytes - chunked_message = self.response.read(read_buffer) - total_byte_parsed = 0 - if len(chunked_message) == 0: - self.close() + + crc_bytes = io.BytesIO() + total_bytes_len = self.response.read(4) + if len(total_bytes_len) == 0: return b'' - # The first 4 bytes gives the total_byte_length and then - # complete message is extracted - while total_byte_parsed < read_buffer: - # Case 1 - If the total_byte_length is partially read - # in the chunked message , then complete the total_byte_length - # by reading the required bytes from response and then - # generate the complete message - if read_buffer - total_byte_parsed <= 4: - value = chunked_message[total_byte_parsed: - total_byte_parsed + - (read_buffer - total_byte_parsed) + - 1] - rem_bytes = self.response.read(4 - (read_buffer - - total_byte_parsed)) - message = value + rem_bytes + \ - self.response.read(byte_int(value+rem_bytes)-4) - end_status = self.__decode_message(message, rec) - total_byte_parsed = 0 - break - else: - total_byte_length = chunked_message[total_byte_parsed: total_byte_parsed + 4] - # Case 2 - Incomplete message in chunked message , - # so creating the complete message by reading the - # total_byte_length- len_read from the response message. - if total_byte_parsed + byte_int(total_byte_length) > read_buffer: - len_read = len(chunked_message[total_byte_parsed:]) - message = chunked_message[total_byte_parsed:] + \ - self.response.read(byte_int(total_byte_length)-len_read) - end_status = self.__decode_message(message, rec) - total_byte_parsed += byte_int(total_byte_length) - # Case 3- the complete message is present in chunked - # messsage. + total_length = byte_int(total_bytes_len) + header_bytes_len = self.response.read(4) + if len(header_bytes_len) == 0: + return b'' + + header_len = byte_int(header_bytes_len) + + crc_bytes.write(total_bytes_len) + crc_bytes.write(header_bytes_len) + + prelude_bytes_crc = self.response.read(4) + if not validate_crc(crc_bytes.getvalue(), prelude_bytes_crc): + self.close() + raise CRCValidationError( + {"Checksum Mismatch, PreludeCRC of " + + str(calculate_crc(crc_bytes.getvalue())) + + " does not equal expected CRC of " + + str(byte_int(prelude_bytes_crc))}) + + crc_bytes.write(prelude_bytes_crc) + + header_bytes = self.response.read(header_len) + if len(header_bytes) == 0: + raise SelectMessageError( + "Premature truncation of select message header"+ + ", server is sending corrupt message?") + + crc_bytes.write(header_bytes) + + header_map = self.__extract_header(header_bytes) + payload_length = total_length - header_len - int(16) + payload_bytes = b'' + return_bytes = b'' + if header_map["message-type"] == ERROR: + self.close() + raise SelectMessageError( + header_map["error-code"] + ":\"" + \ + header_map["error-message"] + "\"") + elif header_map["message-type"] == EVENT: + event_type = header_map["event-type"] + if event_type == EVENT_END: + self.close() + payload_bytes = b'' + elif event_type == EVENT_CONT: + payload_bytes = b'' + ## This is to indicate continue reading. + return_bytes = b'continue' + elif event_type == EVENT_STATS: + # Will fully ignoring progress events. + content_type = header_map["content-type"] + if content_type == EVENT_CONTENT_TYPE: + payload_bytes = self.response.read(payload_length) + self.__read_stats(payload_bytes) else: - message = chunked_message[total_byte_parsed: - total_byte_parsed + - byte_int(total_byte_length)] - total_byte_parsed += byte_int(total_byte_length) - end_status = self.__decode_message(message, rec) - if end_status: - break - return rec + self.close() + raise SelectMessageError( + "Unrecognized content-type {0}".format(content_type)) + elif event_type == EVENT_RECORDS: + payload_bytes = self.response.read(payload_length) + return_bytes = payload_bytes + else: + raise SelectMessageError( + "Unrecognized message-type {0}".format(header_map["message-type"]) + ) + + crc_bytes.write(payload_bytes) - def __extract_header(self, header, header_length): + message_crc = self.response.read(4) + if len(message_crc) == 0: + return b'' + + if not validate_crc(crc_bytes.getvalue(), + message_crc): + self.close() + raise CRCValidationError( + {"Checksum Mismatch, MessageCRC of " + + str(calculate_crc(crc_bytes.getvalue())) + + " does not equal expected CRC of " + + str(byte_int(message_crc))}) + + return return_bytes + + def __extract_header(self, header_bytes): """ - populates the header map after reading the header + populates the header map after reading the header in bytes """ header_map = {} header_byte_parsed = 0 # While loop ends when all the headers present are read # header contains multipe headers - while header_byte_parsed < header_length: - header_name_byte_length = \ - byte_int(header[header_byte_parsed: header_byte_parsed+1]) + while header_byte_parsed < len(header_bytes): + header_name_byte_length = byte_int(header_bytes[header_byte_parsed:header_byte_parsed+1]) header_byte_parsed += 1 header_name = \ - header[header_byte_parsed: - header_byte_parsed+header_name_byte_length] + header_bytes[header_byte_parsed: + header_byte_parsed+header_name_byte_length] header_byte_parsed += header_name_byte_length # Header Value Type is of 1 bytes and is skipped header_byte_parsed += 1 value_string_byte_length = \ - byte_int(header[header_byte_parsed: - header_byte_parsed+2]) + byte_int(header_bytes[header_byte_parsed: + header_byte_parsed+2]) header_byte_parsed += 2 header_value = \ - header[header_byte_parsed: - header_byte_parsed+value_string_byte_length] + header_bytes[header_byte_parsed: + header_byte_parsed+value_string_byte_length] header_byte_parsed += value_string_byte_length header_map[header_name.decode("utf-8").lstrip(":")] = \ header_value.decode("utf-8").lstrip(":") @@ -188,77 +226,6 @@ def __read_stats(self, stats): elif attribute.tag == 'BytesReturned': self.stat['BytesReturned'] = attribute.text - def __parse_message(self, header_map, payload, payload_length, record): - ''' - Parses the message - ''' - if header_map["message-type"] == ERROR: - error = header_map["error-code"] + ":\"" +\ - header_map["error-message"] + "\"" - if header_map["message-type"] == EVENT: - # Fetch the content-type - content_type = header_map["content-type"] - # Fetch the event-type - event_type = header_map["event-type"] - if event_type == EVENT_RECORDS: - record += payload[0:payload_length] - elif event_type == EVENT_PROGRESS: - if content_type == "text/xml": - progress = payload[0:payload_length] - elif event_type == EVENT_STATS: - if content_type == "text/xml": - self.__read_stats(payload[0:payload_length]) - - def __decode_message(self, message, rec): - end_status = False - total_byte_length = message[0:4] # total_byte_length is of 4 bytes - headers_byte_length = message[4: 8] # headers_byte_length is 4 bytes - prelude_crc = message[8:12] # prelude_crc is of 4 bytes - header = message[12:12+byte_int(headers_byte_length)] - payload_length = byte_int(total_byte_length) - \ - byte_int(headers_byte_length) - int(16) - payload = message[12 + byte_int(headers_byte_length): - 12 + byte_int(headers_byte_length) + payload_length] - message_crc = message[12 + byte_int(headers_byte_length) + - payload_length: 12 + - byte_int(headers_byte_length) + - payload_length + 4] - - if not validate_crc(total_byte_length + headers_byte_length, - prelude_crc): - raise CRCValidationError( - {"Checksum Mismatch, MessageCRC of " + - str(calculate_crc(total_byte_length + - headers_byte_length)) + - " does not equal expected CRC of " + - str(byte_int(prelude_crc))}) - - if not validate_crc(message[0:len(message)-4], message_crc): - raise CRCValidationError( - {"Checksum Mismatch, MessageCRC of " + - str(calculate_crc(message)) + - " does not equal expected CRC of " + - str(byte_int(message_crc))}) - - header_map = self.__extract_header(header, byte_int(headers_byte_length)) - - if header_map["message-type"] == EVENT: - # Parse message only when event-type is Records, - # Progress, Stats. Break the loop if event type is End - # Do nothing if event type is Cont - if header_map["event-type"] == EVENT_RECORDS or \ - header_map["event-type"] == EVENT_PROGRESS or \ - header_map["event-type"] == EVENT_STATS: - self.__parse_message(header_map, payload, - payload_length, rec) - - if header_map["event-type"] == EVENT_END: - end_status = True - if header_map["message-type"] == ERROR: - self.__parse_message(header_map, payload, payload_length, rec) - end_status = True - return end_status - def __read(self, num_bytes): """ extract each record from the response body ... and buffer it. @@ -269,26 +236,28 @@ def __read(self, num_bytes): res = self.__extract_message() if len(res) == 0: return b'' + elif res == b'continue': + return res else: self.remaining_bytes = res + result = self.remaining_bytes if num_bytes < len(self.remaining_bytes): result = self.remaining_bytes[:num_bytes] - del self.remaining_bytes[:num_bytes] - return result - else: - left_in_buffer = self.remaining_bytes[:len(self.remaining_bytes)] - del self.remaining_bytes[:len(left_in_buffer)] - return left_in_buffer - def stream(self, num_bytes): + self.remaining_bytes = self.remaining_bytes[len(result):] + return result + + def stream(self, num_bytes=32*1024): """ streams the response """ - while True: + while not self.response.isclosed(): x = self.__read(num_bytes) if x == b'': break + if x == b'continue': + continue elif len(x) < num_bytes: x += self.__read(num_bytes-len(x)) - yield x.decode('utf-8') if isinstance(x, bytearray) else x + yield x.decode('utf-8', errors='ignore')