Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only commit migration transaction if migration can be inserted into the DB #30

Merged
merged 11 commits into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ otp_release:
- 20.2
addons:
apt:
sources:
- mysql-5.7-trusty
packages:
- mysql-server-5.6
- mysql-client-core-5.6
- mysql-client-5.6
- mysql-server
- mysql-client
before_install:
- sudo service postgresql stop
- sudo apt-get -y -qq --purge remove postgresql libpq-dev libpq5 postgresql-client-common postgresql-common
Expand Down Expand Up @@ -44,6 +45,8 @@ before_script:
- echo "host all postgrex_md5_pw 127.0.0.1/32 md5" | sudo tee -a /etc/postgresql/$PGVERSION/main/pg_hba.conf
- echo "host all postgrex_cleartext_pw 127.0.0.1/32 password" | sudo tee -a /etc/postgresql/$PGVERSION/main/pg_hba.conf
- sudo service postgresql restart
- sudo mysql_upgrade
- sudo service mysql --full-restart
notifications:
recipients:
- jose.valim@plataformatec.com.br
Expand Down
74 changes: 59 additions & 15 deletions integration_test/sql/migrator.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ defmodule Ecto.Integration.MigratorTest do
use Ecto.Integration.Case

import Support.FileHelpers
import Ecto.Migrator, only: [migrated_versions: 1]
import ExUnit.CaptureLog
import Ecto.Migrator

alias Ecto.Integration.PoolRepo
alias Ecto.Migration.SchemaMigration
Expand All @@ -14,15 +15,38 @@ defmodule Ecto.Integration.MigratorTest do
:ok
end

defmodule AnotherSchemaMigration do
use Ecto.Migration

def change do
execute PoolRepo.create_prefix("bad_schema_migrations"),
PoolRepo.drop_prefix("bad_schema_migrations")

create table(:schema_migrations, prefix: "bad_schema_migrations") do
add :version, :string
add :inserted_at, :integer
end
end
end

defmodule BrokenLinkMigration do
use Ecto.Migration

def change do
Task.start_link(fn -> raise "oops" end)
Process.sleep(:infinity)
end
end

defmodule GoodMigration do
use Ecto.Migration

def up do
:ok
create table(:good_migration)
end

def down do
:ok
drop table(:good_migration)
end
end

Expand All @@ -34,20 +58,14 @@ defmodule Ecto.Integration.MigratorTest do
end
end

import Ecto.Migrator

test "schema migration" do
up(PoolRepo, 30, GoodMigration, log: false)

[migration] = PoolRepo.all(SchemaMigration)
assert migration.version == 30
assert migration.inserted_at
end

test "migrations up and down" do
assert migrated_versions(PoolRepo) == []
assert up(PoolRepo, 31, GoodMigration, log: false) == :ok

[migration] = PoolRepo.all(SchemaMigration)
assert migration.version == 31
assert migration.inserted_at

assert migrated_versions(PoolRepo) == [31]
assert up(PoolRepo, 31, GoodMigration, log: false) == :already_up
assert migrated_versions(PoolRepo) == [31]
Expand All @@ -57,10 +75,37 @@ defmodule Ecto.Integration.MigratorTest do
assert migrated_versions(PoolRepo) == []
end

test "bad migration" do
test "does not commit migration if insert into schema migration fails" do
# First we create a new schema migration table in another prefix
assert up(PoolRepo, 33, AnotherSchemaMigration, log: false) == :ok
assert migrated_versions(PoolRepo) == [33]

assert capture_log(fn ->
catch_error(up(PoolRepo, 34, GoodMigration, log: false, prefix: "bad_schema_migrations"))
catch_error(PoolRepo.all("good_migration"))
catch_error(PoolRepo.all("good_migration", prefix: "bad_schema_migrations"))
end) =~ "Could not update schema migrations"

assert down(PoolRepo, 33, AnotherSchemaMigration, log: false) == :ok
end

test "bad execute migration" do
assert catch_error(up(PoolRepo, 31, BadMigration, log: false))
end

test "broken link migration" do
Process.flag(:trap_exit, true)

assert capture_log(fn ->
{:ok, pid} = Task.start_link(fn -> up(PoolRepo, 31, BrokenLinkMigration, log: false) end)
assert_receive {:EXIT, ^pid, _}
end) =~ "oops"

assert capture_log(fn ->
catch_exit(up(PoolRepo, 31, BrokenLinkMigration, log: false))
end) =~ "oops"
end

test "run up to/step migration" do
in_tmp fn path ->
create_migration(47)
Expand Down Expand Up @@ -140,7 +185,6 @@ defmodule Ecto.Integration.MigratorTest do
defmodule #{module} do
use Ecto.Migration


def up do
update &[#{num}|&1]
end
Expand Down
77 changes: 46 additions & 31 deletions lib/ecto/migrator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,12 @@ defmodule Ecto.Migrator do
end

defp do_up(repo, version, module, opts) do
run_maybe_in_transaction(repo, module, fn ->
async_migrate_maybe_in_transaction(repo, version, module, :up, opts, fn ->
attempt(repo, version, module, :forward, :up, :up, opts)
|| attempt(repo, version, module, :forward, :change, :up, opts)
|| {:error, Ecto.MigrationError.exception(
"#{inspect module} does not implement a `up/0` or `change/0` function")}
end)
|> case do
:ok ->
verbose_schema_migration repo, "update schema migrations", fn ->
SchemaMigration.up(repo, version, opts[:prefix])
end
:ok
error ->
error
end
end

@doc """
Expand Down Expand Up @@ -153,41 +144,65 @@ defmodule Ecto.Migrator do
end

defp do_down(repo, version, module, opts) do
run_maybe_in_transaction(repo, module, fn ->
async_migrate_maybe_in_transaction(repo, version, module, :down, opts, fn ->
attempt(repo, version, module, :forward, :down, :down, opts)
|| attempt(repo, version, module, :backward, :change, :down, opts)
|| {:error, Ecto.MigrationError.exception(
"#{inspect module} does not implement a `down/0` or `change/0` function")}
end)
|> case do
:ok ->
end

defp async_migrate_maybe_in_transaction(repo, version, module, direction, opts, fun) do
parent = self()
ref = make_ref()
task = Task.async(fn -> run_maybe_in_transaction(parent, ref, repo, module, fun) end)

if migrated_successfully?(ref, task.pid) do
try do
# The table with schema migrations can only be updated from
# the parent process because it has a lock on the table

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an alternative approach to this, which is to use advisory locks.

I wrote a patch to Rails a couple of years ago that added this functionality to ActiveRecord. It avoids the problem of locking the schema_migrations table in a separate transaction.

That way you don't need two transactions at all, and can commit to the schema_migrations table directly from the main migration transaction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use advisory lock perhaps we can rollback the change that requires pool_size: 2 for migrations? Ref: elixir-ecto/ecto#2258

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samphilipd that's nice to hear. Can you dig a link to that commit/PR please?

Although I would wait before migrating to advisory locks because we need to support the current mechanism anyway for other databases.

Copy link

@samsondav samsondav Nov 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josevalim Both MySQL and Postgres support advisory locks. I am not sure about other databases, these are the only two that Rails guarantees safe concurrent migrations for AFAIK.

@wojtekmach it seems likely that yes, you will be able to do the migration with only one connection if you use advisory locks.

Take a look at migration.rb in ActiveRecord.

My original PR is here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info! There is also a discussion here and here.

verbose_schema_migration repo, "update schema migrations", fn ->
SchemaMigration.down(repo, version, opts[:prefix])
apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
end
:ok
error ->
error
catch
kind, error ->
Task.shutdown(task, :brutal_kill)
:erlang.raise(kind, error, System.stacktrace())
end
end

send(task.pid, ref)
Task.await(task, :infinity)
end

defp run_maybe_in_transaction(repo, module, fun) do
fn -> do_run_maybe_in_transaction(repo, module, fun) end
|> Task.async()
|> Task.await(:infinity)
defp migrated_successfully?(ref, pid) do
receive do
{^ref, :ok} -> true
{^ref, _} -> false
{:EXIT, ^pid, _} -> false
end
end

defp do_run_maybe_in_transaction(repo, module, fun) do
cond do
module.__migration__[:disable_ddl_transaction] ->
fun.()
repo.__adapter__.supports_ddl_transaction? ->
{:ok, result} = repo.transaction(fun, log: false, timeout: :infinity)
result
true ->
fun.()
defp run_maybe_in_transaction(parent, ref, repo, module, fun) do
if module.__migration__[:disable_ddl_transaction] ||
not repo.__adapter__.supports_ddl_transaction? do
send_and_receive(parent, ref, fun.())
else
{:ok, result} =
repo.transaction(
fn -> send_and_receive(parent, ref, fun.()) end,
log: false, timeout: :infinity
)

result
end
catch kind, reason ->
{kind, reason, System.stacktrace}
send_and_receive(parent, ref, {kind, reason, System.stacktrace})
end

defp send_and_receive(parent, ref, value) do
Copy link
Contributor

@fertapric fertapric Nov 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about naming this report_and_wait_for_schema_migrations_update or similar? It can also be splitted into:

report_migration_result(parent, {ref, value})
wait_for_schema_migrations_update(ref, value)

I'm trying to be more clear about why this "two-step" flow is required. With that goal in mind, the code above:

try do
  receive do
    {^ref, :ok} ->
      verbose_schema_migration repo, "update schema migrations", fn ->
        apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
      end
      
    {^ref, _} ->
      :ok
      
    {:EXIT, ^pid, _} ->
      :ok
  end
catch
  kind, error ->
    Task.shutdown(task, :brutal_kill)
    :erlang.raise(kind, error, System.stacktrace())
else
  _ ->
    send(task.pid, ref)
    Task.await(task, :infinity)
end

could be abstracted to:

parent = self()
result_ref = make_ref()

task = Task.async(fn -> run_maybe_in_transaction(parent, result_ref, repo, module, fun) end)

case wait_for_migration_result(task, result_ref) do
  :ok ->
    # The table with schema migrations can only be updated from the parent process
    # because it has a lock acquired on that table.
    report_schema_migrations_update(task, fn ->
      verbose_schema_migration repo, "update schema migrations", fn ->
        apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
      end
    end)
  
  _ ->
    :ok
end)

Task.await(task, :infinity)

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. I pushed something similar, although I kept the try catch inline because I don't want to hide the task management into a bunch of functions and instead keep it all in one place. :)

send parent, {ref, value}
receive do: (^ref -> value)
end

defp attempt(repo, version, module, direction, operation, reference, opts) do
Expand Down