Skip to content

Streaming Traits

Alex Woods edited this page Aug 5, 2024 · 8 revisions

This wiki contains a mapping between Smithy Streaming traits and generated Ruby code.

streaming trait

Indicates that the data represented by the shape needs to be streamed. The streaming shape can be applied to either a blob or a union. When applied to a blob, the data should not be stored in memory or size is unknown. When applied to a union, the shape represents an event stream.

@http(method: "POST", uri: "/streaming")
operation StreamingOperation {
    input: StreamingOperationInputOutput,
    output: StreamingOperationInputOutput,
}

structure StreamingOperationInputOutput {
    @httpPayload
    stream: StreamingBlob,
}

@streaming
blob StreamingBlob

// example of using @requiresLength trait in the input structure
structure StreamingWithLengthInput {
    @httpPayload
    stream: FiniteStreamingBlob,
}

@streaming
@requiresLength
blob FiniteStreamingBlob

For streaming input, the protocol builder does the following:

  • The Transfer-Encoding header is set to chunked unless the @requiresLength trait is used.
  • Then, the HTTP request body is set to the input member. Hearth::HTTP::Client sets its body to Net::HTTPGenericRequest#body_stream.
  • The body_stream method is used as the source for IO.copy_stream.
  • IO.copy_stream will attempt to open and read from a file if the input is a String, otherwise it will read from the IO-like object (has a read or readpartial method). For input that is not IO-like, such as a String, the input is wrapped as a StringIO.

Params ensures that input strings and nils are converted to StringIO:

# params.rb
class StreamingOperationInput
  def self.build(params, context:)
    Hearth::Validator.validate_types!(params, ::Hash, Types::StreamingOperationInput, context: context)
    type = Types::StreamingOperationInput.new
    Hearth::Validator.validate_unknown!(type, params, context: context) if params.is_a?(Hash)
    io = params[:stream] || StringIO.new
    unless io.respond_to?(:read) || io.respond_to?(:readpartial)
      io = StringIO.new(io)
    end
    type.stream = io
    type
  end
end

Validators ensures that the input is IO like/readable and if @requiredLength is set, they also validate that size can be called:

# validators.rb
class StreamingOperationInput
  def self.validate!(input, context:)
    Hearth::Validator.validate_types!(input, Types::StreamingOperationInput, context: context)
    unless input[:stream].respond_to?(:read) || input[:stream].respond_to?(:readpartial)
      raise ArgumentError, "Expected #{context} to be an IO like object, got #{input[:stream].class}"
    end
  end
end

class StreamingWithLengthInput
  def self.validate!(input, context:)
    Hearth::Validator.validate_types!(input, Types::StreamingWithLengthInput, context: context)
    unless input[:stream].respond_to?(:read) || input[:stream].respond_to?(:readpartial)
      raise ArgumentError, "Expected #{context} to be an IO like object, got #{input[:stream].class}"
    end

    unless input[:stream].respond_to?(:size)
      raise ArgumentError, "Expected #{context} to respond_to(:size)"
    end
  end
end

Builders set the http request body, Transfer-Encoding header (if not requiredLength) and the appropriate Content-Type header:

# builders.rb
class StreamingOperation
  def self.build(http_req, input:)
    http_req.body = input[:stream]
    http_req.headers['Transfer-Encoding'] = 'chunked'
    http_req.headers['Content-Type'] = 'application/octet-stream'
  end
end

For streaming output, an output stream object needs to be provided. In the client, an output_stream method is used, defaulting to StringIO. When the operation is provided an object that responds to write to the output_stream option, it is used as the body for the HTTP response. The Hearth::HTTP::Client will write the data to this object. The parser will also assign this object to the output object.

# client.rb - in operation
def streaming_operation
  response_body = output_stream(options, &block)
  ...
  context = Hearth::Context.new(
    ...
    response: Hearth::HTTP::Response.new(body: response_body),
    ...
  )
  output = stack.run(input, context)
  ...
end

# client.rb - private method
def output_stream(options = {}, &block)
  return options.delete(:output_stream) if options[:output_stream]
  return Hearth::BlockIO.new(block) if block

  ::StringIO.new
end

# parsers.rb
class StreamingOperation
  def self.parse(http_resp)
    data = Types::StreamingOperationOutput.new
    data.stream = http_resp.body
    data
  end
end

Assuming a Client with a streaming_operation operation, the SDK usage is:

params = { blob: 'This is some text', stream_id: '123' }
File.open('some-file.txt', 'w') do |f|
  resp = client.streaming_operation(params, { output_stream: f })
end

See Smithy Documentation for more details.

requiresLength trait

Indicates that the streaming blob MUST be finite and has a known size. This trait applies only to shapes with the @streaming trait. If this trait is present, the Content-Length header with the body's size is sent if it can be determined. An error is raised if the IO object’s size cannot be calculated.

@streaming
@requiresLength
blob FiniteStreamingBlob

See Smithy Documentation for more details.

Event Streams

An event stream is an abstraction that allows multiple messages to be sent asynchronously between a client and server. Event streams support both duplex and simplex streaming. See Smithy Documentation for more details.

Response (output) events are asynchronous and are handled by registering handlers when calling the operation. Request (Input) events are signaled (sent) by calling the corresponding signal methods on the output object returned from the operation.

Assuming a start_event_stream operation with duplex streaming, the client usage is:

client = MyService::Client.new
handler = MyService::EventStream::StartEventStreamHandler.new
# register various handlers
handler.on_my_event { |event| handle_my_event(event) } 

stream = client.start_event_stream(initial_request_params, event_stream_handler: handler)

stream.signal_my_event(my_event_params)

stream.join # close our end of the stream and wait for the server to close gracefully