Skip to content

Commit

Permalink
Re-implement select_object_content implementation
Browse files Browse the repository at this point in the history
This change fixes multiple issues

- calculates proper CRC for the entire message as per spec
- handles unicode boundaries properly for special delimiters
- handle zero payload 'Cont' event messages
  • Loading branch information
harshavardhana authored and minio-trusted committed Sep 4, 2019
1 parent 84e57b6 commit 24320bb
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 149 deletions.
6 changes: 3 additions & 3 deletions minio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@
DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB


# Select Object Content
READ_SIZE_SELECT = 32 * 1024 # Buffer size
SQL = 'SQL' # Value for ExpressionType
EVENT_RECORDS = 'Records' # Event Type is Records
EVENT_PROGRESS = 'Progress' # Event Type Progress
EVENT_STATS = 'Stats' # Event Type Stats
EVENT = 'event' # Message Type is event
EVENT_CONT = 'Cont' # Event Type continue
EVENT_END = 'End' # Event Type is End
EVENT_CONTENT_TYPE = "text/xml" # Event content xml type
EVENT = 'event' # Message Type is event
ERROR = 'error' # Message Type is error

_VALID_BUCKETNAME_REGEX = re.compile('^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$')
Expand Down
261 changes: 115 additions & 146 deletions minio/select_object_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
from .error import InvalidXMLError
from xml.etree.cElementTree import ParseError

from .helpers import (READ_SIZE_SELECT, EVENT_RECORDS,
EVENT_PROGRESS, EVENT_STATS, EVENT, EVENT_END, ERROR)
from .helpers import (EVENT_RECORDS, EVENT_PROGRESS,
EVENT_STATS, EVENT_CONT,
EVENT, EVENT_CONTENT_TYPE,
EVENT_END, ERROR)


class SelectMessageError(Exception):
'''
Raised in case of message type 'error'
'''

class CRCValidationError(Exception):
'''
Raised in case of CRC mismatch
'''


def calculate_crc(value):
'''
Returns the CRC using crc32
Expand Down Expand Up @@ -66,7 +72,7 @@ class SelectObjectReader(object):
"""
def __init__(self, response):
self.response = response
self.remaining_bytes = bytearray()
self.remaining_bytes = bytes()
self.stat = {}
self.prog = {}

Expand All @@ -76,10 +82,6 @@ def readable(self):
def writeable(self):
return False

@property
def closed(self):
return self.response.isclosed()

def close(self):
self.response.close()

Expand All @@ -94,82 +96,118 @@ def __extract_message(self):
Process the response sent from server.
https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html
"""
rec = bytearray()
read_buffer = READ_SIZE_SELECT
# Messages read in chunks of read_buffer bytes
chunked_message = self.response.read(read_buffer)
total_byte_parsed = 0
if len(chunked_message) == 0:
self.close()

crc_bytes = io.BytesIO()
total_bytes_len = self.response.read(4)
if len(total_bytes_len) == 0:
return b''

# The first 4 bytes gives the total_byte_length and then
# complete message is extracted
while total_byte_parsed < read_buffer:
# Case 1 - If the total_byte_length is partially read
# in the chunked message , then complete the total_byte_length
# by reading the required bytes from response and then
# generate the complete message
if read_buffer - total_byte_parsed <= 4:
value = chunked_message[total_byte_parsed:
total_byte_parsed +
(read_buffer - total_byte_parsed) +
1]
rem_bytes = self.response.read(4 - (read_buffer -
total_byte_parsed))
message = value + rem_bytes + \
self.response.read(byte_int(value+rem_bytes)-4)
end_status = self.__decode_message(message, rec)
total_byte_parsed = 0
break
else:
total_byte_length = chunked_message[total_byte_parsed: total_byte_parsed + 4]
# Case 2 - Incomplete message in chunked message ,
# so creating the complete message by reading the
# total_byte_length- len_read from the response message.
if total_byte_parsed + byte_int(total_byte_length) > read_buffer:
len_read = len(chunked_message[total_byte_parsed:])
message = chunked_message[total_byte_parsed:] + \
self.response.read(byte_int(total_byte_length)-len_read)
end_status = self.__decode_message(message, rec)
total_byte_parsed += byte_int(total_byte_length)
# Case 3- the complete message is present in chunked
# messsage.
total_length = byte_int(total_bytes_len)
header_bytes_len = self.response.read(4)
if len(header_bytes_len) == 0:
return b''

header_len = byte_int(header_bytes_len)

crc_bytes.write(total_bytes_len)
crc_bytes.write(header_bytes_len)

prelude_bytes_crc = self.response.read(4)
if not validate_crc(crc_bytes.getvalue(), prelude_bytes_crc):
self.close()
raise CRCValidationError(
{"Checksum Mismatch, PreludeCRC of " +
str(calculate_crc(crc_bytes.getvalue())) +
" does not equal expected CRC of " +
str(byte_int(prelude_bytes_crc))})

crc_bytes.write(prelude_bytes_crc)

header_bytes = self.response.read(header_len)
if len(header_bytes) == 0:
raise SelectMessageError(
"Premature truncation of select message header"+
", server is sending corrupt message?")

crc_bytes.write(header_bytes)

header_map = self.__extract_header(header_bytes)
payload_length = total_length - header_len - int(16)
payload_bytes = b''
return_bytes = b''
if header_map["message-type"] == ERROR:
self.close()
raise SelectMessageError(
header_map["error-code"] + ":\"" + \
header_map["error-message"] + "\"")
elif header_map["message-type"] == EVENT:
event_type = header_map["event-type"]
if event_type == EVENT_END:
self.close()
payload_bytes = b''
elif event_type == EVENT_CONT:
payload_bytes = b''
## This is to indicate continue reading.
return_bytes = b'continue'
elif event_type == EVENT_STATS:
# Will fully ignoring progress events.
content_type = header_map["content-type"]
if content_type == EVENT_CONTENT_TYPE:
payload_bytes = self.response.read(payload_length)
self.__read_stats(payload_bytes)
else:
message = chunked_message[total_byte_parsed:
total_byte_parsed +
byte_int(total_byte_length)]
total_byte_parsed += byte_int(total_byte_length)
end_status = self.__decode_message(message, rec)
if end_status:
break
return rec
self.close()
raise SelectMessageError(
"Unrecognized content-type {0}".format(content_type))
elif event_type == EVENT_RECORDS:
payload_bytes = self.response.read(payload_length)
return_bytes = payload_bytes
else:
raise SelectMessageError(
"Unrecognized message-type {0}".format(header_map["message-type"])
)

crc_bytes.write(payload_bytes)

def __extract_header(self, header, header_length):
message_crc = self.response.read(4)
if len(message_crc) == 0:
return b''

if not validate_crc(crc_bytes.getvalue(),
message_crc):
self.close()
raise CRCValidationError(
{"Checksum Mismatch, MessageCRC of " +
str(calculate_crc(crc_bytes.getvalue())) +
" does not equal expected CRC of " +
str(byte_int(message_crc))})

return return_bytes

def __extract_header(self, header_bytes):
"""
populates the header map after reading the header
populates the header map after reading the header in bytes
"""
header_map = {}
header_byte_parsed = 0
# While loop ends when all the headers present are read
# header contains multipe headers
while header_byte_parsed < header_length:
header_name_byte_length = \
byte_int(header[header_byte_parsed: header_byte_parsed+1])
while header_byte_parsed < len(header_bytes):
header_name_byte_length = byte_int(header_bytes[header_byte_parsed:header_byte_parsed+1])
header_byte_parsed += 1
header_name = \
header[header_byte_parsed:
header_byte_parsed+header_name_byte_length]
header_bytes[header_byte_parsed:
header_byte_parsed+header_name_byte_length]
header_byte_parsed += header_name_byte_length
# Header Value Type is of 1 bytes and is skipped
header_byte_parsed += 1
value_string_byte_length = \
byte_int(header[header_byte_parsed:
header_byte_parsed+2])
byte_int(header_bytes[header_byte_parsed:
header_byte_parsed+2])
header_byte_parsed += 2
header_value = \
header[header_byte_parsed:
header_byte_parsed+value_string_byte_length]
header_bytes[header_byte_parsed:
header_byte_parsed+value_string_byte_length]
header_byte_parsed += value_string_byte_length
header_map[header_name.decode("utf-8").lstrip(":")] = \
header_value.decode("utf-8").lstrip(":")
Expand All @@ -188,77 +226,6 @@ def __read_stats(self, stats):
elif attribute.tag == 'BytesReturned':
self.stat['BytesReturned'] = attribute.text

def __parse_message(self, header_map, payload, payload_length, record):
'''
Parses the message
'''
if header_map["message-type"] == ERROR:
error = header_map["error-code"] + ":\"" +\
header_map["error-message"] + "\""
if header_map["message-type"] == EVENT:
# Fetch the content-type
content_type = header_map["content-type"]
# Fetch the event-type
event_type = header_map["event-type"]
if event_type == EVENT_RECORDS:
record += payload[0:payload_length]
elif event_type == EVENT_PROGRESS:
if content_type == "text/xml":
progress = payload[0:payload_length]
elif event_type == EVENT_STATS:
if content_type == "text/xml":
self.__read_stats(payload[0:payload_length])

def __decode_message(self, message, rec):
end_status = False
total_byte_length = message[0:4] # total_byte_length is of 4 bytes
headers_byte_length = message[4: 8] # headers_byte_length is 4 bytes
prelude_crc = message[8:12] # prelude_crc is of 4 bytes
header = message[12:12+byte_int(headers_byte_length)]
payload_length = byte_int(total_byte_length) - \
byte_int(headers_byte_length) - int(16)
payload = message[12 + byte_int(headers_byte_length):
12 + byte_int(headers_byte_length) + payload_length]
message_crc = message[12 + byte_int(headers_byte_length) +
payload_length: 12 +
byte_int(headers_byte_length) +
payload_length + 4]

if not validate_crc(total_byte_length + headers_byte_length,
prelude_crc):
raise CRCValidationError(
{"Checksum Mismatch, MessageCRC of " +
str(calculate_crc(total_byte_length +
headers_byte_length)) +
" does not equal expected CRC of " +
str(byte_int(prelude_crc))})

if not validate_crc(message[0:len(message)-4], message_crc):
raise CRCValidationError(
{"Checksum Mismatch, MessageCRC of " +
str(calculate_crc(message)) +
" does not equal expected CRC of " +
str(byte_int(message_crc))})

header_map = self.__extract_header(header, byte_int(headers_byte_length))

if header_map["message-type"] == EVENT:
# Parse message only when event-type is Records,
# Progress, Stats. Break the loop if event type is End
# Do nothing if event type is Cont
if header_map["event-type"] == EVENT_RECORDS or \
header_map["event-type"] == EVENT_PROGRESS or \
header_map["event-type"] == EVENT_STATS:
self.__parse_message(header_map, payload,
payload_length, rec)

if header_map["event-type"] == EVENT_END:
end_status = True
if header_map["message-type"] == ERROR:
self.__parse_message(header_map, payload, payload_length, rec)
end_status = True
return end_status

def __read(self, num_bytes):
"""
extract each record from the response body ... and buffer it.
Expand All @@ -269,26 +236,28 @@ def __read(self, num_bytes):
res = self.__extract_message()
if len(res) == 0:
return b''
elif res == b'continue':
return res
else:
self.remaining_bytes = res

result = self.remaining_bytes
if num_bytes < len(self.remaining_bytes):
result = self.remaining_bytes[:num_bytes]
del self.remaining_bytes[:num_bytes]
return result
else:
left_in_buffer = self.remaining_bytes[:len(self.remaining_bytes)]
del self.remaining_bytes[:len(left_in_buffer)]
return left_in_buffer

def stream(self, num_bytes):
self.remaining_bytes = self.remaining_bytes[len(result):]
return result

def stream(self, num_bytes=32*1024):
"""
streams the response
"""
while True:
while not self.response.isclosed():
x = self.__read(num_bytes)
if x == b'':
break
if x == b'continue':
continue
elif len(x) < num_bytes:
x += self.__read(num_bytes-len(x))
yield x.decode('utf-8') if isinstance(x, bytearray) else x
yield x.decode('utf-8', errors='ignore')

0 comments on commit 24320bb

Please sign in to comment.