Skip to content

Commit

Permalink
Only commit migration transaction if migration can be inserted into t…
Browse files Browse the repository at this point in the history
…he DB (#30)
  • Loading branch information
josevalim committed Nov 7, 2018
1 parent 2f7f4bc commit bd2efde
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 49 deletions.
9 changes: 6 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ otp_release:
- 20.2
addons:
apt:
sources:
- mysql-5.7-trusty
packages:
- mysql-server-5.6
- mysql-client-core-5.6
- mysql-client-5.6
- mysql-server
- mysql-client
before_install:
- sudo service postgresql stop
- sudo apt-get -y -qq --purge remove postgresql libpq-dev libpq5 postgresql-client-common postgresql-common
Expand Down Expand Up @@ -44,6 +45,8 @@ before_script:
- echo "host all postgrex_md5_pw 127.0.0.1/32 md5" | sudo tee -a /etc/postgresql/$PGVERSION/main/pg_hba.conf
- echo "host all postgrex_cleartext_pw 127.0.0.1/32 password" | sudo tee -a /etc/postgresql/$PGVERSION/main/pg_hba.conf
- sudo service postgresql restart
- sudo mysql_upgrade
- sudo service mysql --full-restart
notifications:
recipients:
- jose.valim@plataformatec.com.br
Expand Down
74 changes: 59 additions & 15 deletions integration_test/sql/migrator.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ defmodule Ecto.Integration.MigratorTest do
use Ecto.Integration.Case

import Support.FileHelpers
import Ecto.Migrator, only: [migrated_versions: 1]
import ExUnit.CaptureLog
import Ecto.Migrator

alias Ecto.Integration.PoolRepo
alias Ecto.Migration.SchemaMigration
Expand All @@ -14,15 +15,38 @@ defmodule Ecto.Integration.MigratorTest do
:ok
end

defmodule AnotherSchemaMigration do
use Ecto.Migration

def change do
execute PoolRepo.create_prefix("bad_schema_migrations"),
PoolRepo.drop_prefix("bad_schema_migrations")

create table(:schema_migrations, prefix: "bad_schema_migrations") do
add :version, :string
add :inserted_at, :integer
end
end
end

defmodule BrokenLinkMigration do
use Ecto.Migration

def change do
Task.start_link(fn -> raise "oops" end)
Process.sleep(:infinity)
end
end

defmodule GoodMigration do
use Ecto.Migration

def up do
:ok
create table(:good_migration)
end

def down do
:ok
drop table(:good_migration)
end
end

Expand All @@ -34,20 +58,14 @@ defmodule Ecto.Integration.MigratorTest do
end
end

import Ecto.Migrator

test "schema migration" do
up(PoolRepo, 30, GoodMigration, log: false)

[migration] = PoolRepo.all(SchemaMigration)
assert migration.version == 30
assert migration.inserted_at
end

test "migrations up and down" do
assert migrated_versions(PoolRepo) == []
assert up(PoolRepo, 31, GoodMigration, log: false) == :ok

[migration] = PoolRepo.all(SchemaMigration)
assert migration.version == 31
assert migration.inserted_at

assert migrated_versions(PoolRepo) == [31]
assert up(PoolRepo, 31, GoodMigration, log: false) == :already_up
assert migrated_versions(PoolRepo) == [31]
Expand All @@ -57,10 +75,37 @@ defmodule Ecto.Integration.MigratorTest do
assert migrated_versions(PoolRepo) == []
end

test "bad migration" do
test "does not commit migration if insert into schema migration fails" do
# First we create a new schema migration table in another prefix
assert up(PoolRepo, 33, AnotherSchemaMigration, log: false) == :ok
assert migrated_versions(PoolRepo) == [33]

assert capture_log(fn ->
catch_error(up(PoolRepo, 34, GoodMigration, log: false, prefix: "bad_schema_migrations"))
catch_error(PoolRepo.all("good_migration"))
catch_error(PoolRepo.all("good_migration", prefix: "bad_schema_migrations"))
end) =~ "Could not update schema migrations"

assert down(PoolRepo, 33, AnotherSchemaMigration, log: false) == :ok
end

test "bad execute migration" do
assert catch_error(up(PoolRepo, 31, BadMigration, log: false))
end

test "broken link migration" do
Process.flag(:trap_exit, true)

assert capture_log(fn ->
{:ok, pid} = Task.start_link(fn -> up(PoolRepo, 31, BrokenLinkMigration, log: false) end)
assert_receive {:EXIT, ^pid, _}
end) =~ "oops"

assert capture_log(fn ->
catch_exit(up(PoolRepo, 31, BrokenLinkMigration, log: false))
end) =~ "oops"
end

test "run up to/step migration" do
in_tmp fn path ->
create_migration(47)
Expand Down Expand Up @@ -140,7 +185,6 @@ defmodule Ecto.Integration.MigratorTest do
defmodule #{module} do
use Ecto.Migration
def up do
update &[#{num}|&1]
end
Expand Down
77 changes: 46 additions & 31 deletions lib/ecto/migrator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,12 @@ defmodule Ecto.Migrator do
end

defp do_up(repo, version, module, opts) do
run_maybe_in_transaction(repo, module, fn ->
async_migrate_maybe_in_transaction(repo, version, module, :up, opts, fn ->
attempt(repo, version, module, :forward, :up, :up, opts)
|| attempt(repo, version, module, :forward, :change, :up, opts)
|| {:error, Ecto.MigrationError.exception(
"#{inspect module} does not implement a `up/0` or `change/0` function")}
end)
|> case do
:ok ->
verbose_schema_migration repo, "update schema migrations", fn ->
SchemaMigration.up(repo, version, opts[:prefix])
end
:ok
error ->
error
end
end

@doc """
Expand Down Expand Up @@ -153,41 +144,65 @@ defmodule Ecto.Migrator do
end

defp do_down(repo, version, module, opts) do
run_maybe_in_transaction(repo, module, fn ->
async_migrate_maybe_in_transaction(repo, version, module, :down, opts, fn ->
attempt(repo, version, module, :forward, :down, :down, opts)
|| attempt(repo, version, module, :backward, :change, :down, opts)
|| {:error, Ecto.MigrationError.exception(
"#{inspect module} does not implement a `down/0` or `change/0` function")}
end)
|> case do
:ok ->
end

defp async_migrate_maybe_in_transaction(repo, version, module, direction, opts, fun) do
parent = self()
ref = make_ref()
task = Task.async(fn -> run_maybe_in_transaction(parent, ref, repo, module, fun) end)

if migrated_successfully?(ref, task.pid) do
try do
# The table with schema migrations can only be updated from
# the parent process because it has a lock on the table
verbose_schema_migration repo, "update schema migrations", fn ->
SchemaMigration.down(repo, version, opts[:prefix])
apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
end
:ok
error ->
error
catch
kind, error ->
Task.shutdown(task, :brutal_kill)
:erlang.raise(kind, error, System.stacktrace())
end
end

send(task.pid, ref)
Task.await(task, :infinity)
end

defp run_maybe_in_transaction(repo, module, fun) do
fn -> do_run_maybe_in_transaction(repo, module, fun) end
|> Task.async()
|> Task.await(:infinity)
defp migrated_successfully?(ref, pid) do
receive do
{^ref, :ok} -> true
{^ref, _} -> false
{:EXIT, ^pid, _} -> false
end
end

defp do_run_maybe_in_transaction(repo, module, fun) do
cond do
module.__migration__[:disable_ddl_transaction] ->
fun.()
repo.__adapter__.supports_ddl_transaction? ->
{:ok, result} = repo.transaction(fun, log: false, timeout: :infinity)
result
true ->
fun.()
defp run_maybe_in_transaction(parent, ref, repo, module, fun) do
if module.__migration__[:disable_ddl_transaction] ||
not repo.__adapter__.supports_ddl_transaction? do
send_and_receive(parent, ref, fun.())
else
{:ok, result} =
repo.transaction(
fn -> send_and_receive(parent, ref, fun.()) end,
log: false, timeout: :infinity
)

result
end
catch kind, reason ->
{kind, reason, System.stacktrace}
send_and_receive(parent, ref, {kind, reason, System.stacktrace})
end

defp send_and_receive(parent, ref, value) do
send parent, {ref, value}
receive do: (^ref -> value)
end

defp attempt(repo, version, module, direction, operation, reference, opts) do
Expand Down

0 comments on commit bd2efde

Please sign in to comment.