Skip to content

Commit

Permalink
Moar debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
driv3r committed Oct 10, 2023
1 parent 8159121 commit 122b61c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 16 deletions.
43 changes: 29 additions & 14 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "tmpdir"
require "webrick"
require "cgi"
require "securerandom"

module GhostferryHelper
GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration")
Expand Down Expand Up @@ -47,11 +48,12 @@ module Status
AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY"
end

attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines
attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines, :tag

def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: 39393)
@log_capturer = log_capturer
@logger = log_capturer.logger
@tag = SecureRandom.hex[0..3]

@main_path = main_path
@config = config
Expand Down Expand Up @@ -94,6 +96,7 @@ def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port:

# The main method to call to run a Ghostferry subprocess.
def run(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run(state:#{(!resuming_state.nil?).inspect})")
resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash)

compile_binary
Expand All @@ -112,21 +115,26 @@ def run(resuming_state = nil)
# When using this method, you need to ensure that the datawriter has been
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_interrupt(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run_expecting_interrupt(state:#{(!resuming_state.nil?).inspect})")
run(resuming_state)
rescue GhostferryExitFailure
@logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got GhostferryExitFailure")
dumped_state = @stdout.join("")
JSON.parse(dumped_state)
else
@logger.error("[#{@tag}] ghostferry#run_expecting_interrupt: something's wrong")
raise "Ghostferry did not get interrupted"
end

# Same as above - ensure that the datawriter has been
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_failure(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run_expecting_failure(state:#{(!resuming_state.nil?).inspect})")
run(resuming_state)
rescue GhostferryExitFailure
@logger.info("[#{@tag}] ghostferry#run_expecting_failure: got GhostferryExitFailure")
else
raise "Ghostferry did not fail"
raise "[#{@tag}] Ghostferry did not fail"
end

def run_with_logs(resuming_state = nil)
Expand All @@ -140,14 +148,14 @@ def run_with_logs(resuming_state = nil)
def compile_binary
return if File.exist?(@compiled_binary_path)

@logger.debug("compiling test binary to #{@compiled_binary_path}")
@logger.debug("[#{@tag}] compiling test binary to #{@compiled_binary_path}")
rc = system(
"go", "build",
"-o", @compiled_binary_path,
@main_path
)

raise "could not compile ghostferry" unless rc
raise "[#{@tag}] could not compile ghostferry" unless rc
end

def start_server
Expand Down Expand Up @@ -179,14 +187,15 @@ def start_server
resp.status = 400
@server.shutdown
elsif statuses.size > 1
@logger.warn("Got multiple statuses at once: #{statuses.inspect}")
@logger.warn("[#{@tag}] Got multiple statuses at once: #{statuses.inspect}")
puts "Got multiple statuses at once: #{statuses.inspect}"
end

@last_message_time = now

data = query["data"]

@logger.info("[#{@tag}] server: got / with #{statuses.inspect}")
statuses.each do |status|
next if @status_handlers[status].nil?

Expand All @@ -202,6 +211,7 @@ def start_server

@server.mount_proc "/callbacks/progress" do |req, resp|
begin
@logger.info("[#{@tag}] server: got /callbacks/progress")
unless req.body
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data")
resp.status = 400
Expand All @@ -216,6 +226,7 @@ def start_server

@server.mount_proc "/callbacks/state" do |req, resp|
begin
@logger.info("[#{@tag}] server: got /callbacks/state")
unless req.body
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data")
resp.status = 400
Expand All @@ -228,14 +239,15 @@ def start_server
end

@server.mount_proc "/callbacks/error" do |req, resp|
@logger.info("[#{@tag}] server: got /callbacks/error")
@error = JSON.parse(JSON.parse(req.body)["Payload"])
@callback_handlers["error"].each { |f| f.call(@error) } unless @callback_handlers["error"].nil?
end

@server_thread = Thread.new do
@logger.debug("starting server thread")
@logger.debug("[#{@tag}] starting server thread")
@server.start
@logger.debug("server thread stopped")
@logger.debug("[#{@tag}] server thread stopped")
end
end

Expand Down Expand Up @@ -275,7 +287,7 @@ def start_ghostferry(resuming_state = nil)
environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia]
end

@logger.debug("starting ghostferry test binary #{@compiled_binary_path}")
@logger.debug("[#{@tag}] starting ghostferry test binary #{@compiled_binary_path}")
Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr|
stdin.puts(resuming_state) unless resuming_state.nil?
stdin.close
Expand All @@ -297,7 +309,7 @@ def start_ghostferry(resuming_state = nil)

if reader == stdout
@stdout << line
@logger.debug("stdout: #{line}")
@logger.debug("[#{@tag}] stdout: #{line}")
elsif reader == stderr
@stderr << line
if json_log_line?(line)
Expand All @@ -310,8 +322,11 @@ def start_ghostferry(resuming_state = nil)
if logline["level"] == "error"
@error_lines << logline
end

@logger.debug("[#{@tag}] stderr: #{line}") unless tag.end_with?("_binlog_streamer")
else
@logger.debug("[#{@tag}] stderr: #{line}")
end
@logger.debug("stderr: #{line}")
end
end
end
Expand All @@ -320,9 +335,9 @@ def start_ghostferry(resuming_state = nil)
@pid = 0
end

@logger.debug("ghostferry test binary exitted: #{@exit_status}")
@logger.debug("[#{@tag}] ghostferry test binary exitted: #{@exit_status}")
if @exit_status.exitstatus != 0
raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}"
raise GhostferryExitFailure, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}"
end
end
end
Expand All @@ -336,14 +351,14 @@ def start_server_watchdog
if (now - @last_message_time) > @message_timeout
@server.shutdown
@log_capturer.print_output
raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s"
raise "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s"
end

sleep 1
end

@server.shutdown
@logger.debug("server watchdog thread stopped")
@logger.debug("[#{@tag}] server watchdog thread stopped")
end

@server_watchdog_thread.abort_on_exception = true
Expand Down
9 changes: 9 additions & 0 deletions test/integration/interrupt_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_in

# Writes one batch
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
info("test[09]: on_status, received -> TERM")
ghostferry.send_signal("TERM")
end

Expand All @@ -32,6 +33,7 @@ def test_interrupt_and_resume_without_last_known_schema_cache

# Writes one batch
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
info("test[31]: on_status, received -> TERM")
ghostferry.send_signal("TERM")
end

Expand Down Expand Up @@ -448,19 +450,26 @@ def test_interrupt_resume_idempotence_with_multiple_interrupts
end

def test_interrupt_resume_idempotence_with_multiple_interrupts_and_writes_to_source
@debug_me = true
info("test[452] start\n\n")
ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2)

datawriter = new_source_datawriter
start_datawriter_with_ghostferry(datawriter, ghostferry)

info("test[461] ghostferry#run_expecting_interrupt, no state\n\n")
dumped_state = ghostferry.run_expecting_interrupt
assert_basic_fields_exist_in_dumped_state(dumped_state)

ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2)

info("test[466] ghostferry#run_expecting_interrupt, with state\n\n")
ghostferry.run_expecting_interrupt(dumped_state)

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
stop_datawriter_during_cutover(datawriter, ghostferry)

info("test[472] ghostferry#run_with_logs, with state\n\n")
ghostferry.run_with_logs(dumped_state)

assert_test_table_is_identical
Expand Down
15 changes: 13 additions & 2 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
require "data_writer_helper"

class LogCapturer
attr_reader :logger
attr_reader :logger, :logger_device

def initialize(level: Logger::DEBUG)
@capture = ENV["DEBUG"] != "1"
Expand Down Expand Up @@ -50,18 +50,24 @@ def new_ghostferry(filepath, config: {})
# Transform path to something ruby understands
path = File.join(GO_CODE_PATH, filepath, "main.go")
g = Ghostferry.new(path, config: config, log_capturer: @log_capturer)
info("[#{g.tag}] new_ghostferry: create")
@ghostferry_instances << g
g
end

def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0)
g = new_ghostferry(filepath, config: config)

info("[#{g.tag}] new_ghostferry_wiarc: register status hook")
batches_written = 0
g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
batches_written += 1

if batches_written >= after_batches_written
info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> true")
g.send_signal("TERM")
else
info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> false")
end
end

Expand All @@ -83,6 +89,10 @@ def setup_signal_watcher
Signal.trap("TERM") { self.on_term }
end

def info(msg)
@log_capturer.logger.info(msg)
end

##############
# Test Hooks #
##############
Expand All @@ -105,6 +115,7 @@ def before_setup

# Same thing with DataWriter as above
@datawriter_instances = []
@debug_me = nil
end

def after_teardown
Expand All @@ -116,7 +127,7 @@ def after_teardown
datawriter.stop_and_join
end

@log_capturer.print_output if self.failure
@log_capturer.print_output if self.failure || @debug_me
@log_capturer.reset
super
end
Expand Down

0 comments on commit 122b61c

Please sign in to comment.