Skip to content

Commit

Permalink
Merge pull request #34 from ArveH/31-copy-data-without-creating-tables
Browse files Browse the repository at this point in the history
31 copy data without creating tables
  • Loading branch information
ArveH committed May 29, 2024
2 parents 4c9a031 + b96b314 commit 5607180
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 61 deletions.
11 changes: 8 additions & 3 deletions src/ABulkCopy.Cmd.Internal/CmdArguments.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ public class CmdArguments
[Option('f', "folder", Required = false, HelpText = "The source/destination folder for schema and data files.")]
public string? Folder { get; set; }

[Option('m', "mappings-file", Required = false, HelpText = "The path and file name of a json file containing key-value pairs for mapping schema names and collation names. E.g. mapping the \"dbo\" schema in SQL Server to the \"public\" schema in Postgres. There is a sample-mappings.json file accompanying the executable.")]
[Option('m', "mappings-file", Required = false, HelpText = "The path and file name of a json file containing key-value pairs for mapping schema names and collation names. E.g. mapping the \"dbo\" schema in SQL Server to the \"public\" schema in Postgres. There is a sample-mappings.json file accompanying the executable. This parameter is only used when direction = In")]
public string? MappingsFile { get; set; }

[Option('l', "log-file", Required = false, HelpText = "Full path for log file.")]
public string? LogFile { get; set; }

[Option('q', "add-quotes", Required = false, HelpText = "Flag to quote all identifiers. Only applicable for Postgres, where there is a significant difference in behaviour when quoting identifiers. NOTE: Postgres reserved words are always quoted. For SQL Server, this flag is ignored, and identifiers will always be quoted.")]
[Option('q', "add-quotes", Required = false, HelpText = "Flag to quote all identifiers. Only applicable for Postgres, where there is a significant difference in behaviour when quoting identifiers. NOTE: Postgres reserved words are always quoted. For SQL Server, this flag is ignored, and identifiers will always be quoted. This parameter is only used when direction = In")]
public bool AddQuotes { get; set; }

[Option("schema-filter", Required = false, HelpText = "A comma separated list of schema names. When it's not used, all schemas will be copied, except 'guest', 'information_schema', 'sys' and 'logs'")]
[Option("schema-filter", Required = false, HelpText = "A comma separated list of schema names. When it's not used, all schemas will be copied, except 'guest', 'information_schema', 'sys' and 'logs'. This parameter is only used when direction = Out")]
public string? SchemaFilter { get; set; }

[Option('s', "search-filter", Required = false, HelpText = "A string to filter table names or file names. Note that the syntax of the SearchFilter is different depending on the context. For copy in from a file system, use a RegEx in .NET format. E.g. \"\\b(clients|scopes)\\b\" will match \"clients.schema\" and \"scopes.schema\", but not \"someclients.schema\" nor \"clients2.schema\". For copy out from SQL Server, the SearchFilter is the rhs of a LIKE clause. E.g. \"a[sa][ya][sg]%\" to get all tables that starts with 'a' followed by \"sys\" or \"aag\" (but also \"asas\", \"aayg\", and other combinations). If you don't use a search-filter, all tables are copied.")]
Expand All @@ -32,6 +32,10 @@ public class CmdArguments
[Option("empty-string", Required = false, HelpText = "Handle strings that contains whitespace only. Legal values are \"Single\" (an empty string is converted to single space), \"Empty\" (a single space is converted to empty string), \"ForceSingle\" (empty strings and whitespace is converted to a single space), \"ForceEmpty\" (all whitespace is removed). NOTE: This flag has no effect during export, or if strings contains any non-whitespace characters.")]
public EmptyStringFlag? EmptyString { get; set; }

[Option("skip-create", Required = false, HelpText = "This is an experimental parameter. It assumes that the tables already exists in the database, and will skip the \"create table\" step. The thought is that tables are created using Entity Framework migrations, then ABulkCopy is used to insert data. NOTE: Schema files are still needed to create the dependency graph and the copy statements, and they MUST correspond to the tables already in the database. This parameter is only used when direction = In")]
public bool SkipCreate { get; set; }


public Dictionary<string, string?> ToAppSettings()
{
var appSettings = new Dictionary<string, string?>();
Expand Down Expand Up @@ -61,6 +65,7 @@ public class CmdArguments
appSettings.Add(Constants.Config.LogFile, LogFile);
}
appSettings.Add(Constants.Config.AddQuotes, AddQuotes.ToString());
appSettings.Add(Constants.Config.SkipCreate, SkipCreate.ToString());
if (!string.IsNullOrWhiteSpace(SchemaFilter))
{
appSettings.Add(Constants.Config.SchemaFilter, SchemaFilter);
Expand Down
170 changes: 112 additions & 58 deletions src/ABulkCopy.Cmd.Internal/CopyIn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,30 @@ public async Task RunAsync(Rdbms rdbms, CancellationToken ct)

var errors = 0;
ITableSequencer tableSequencer = new TableSequencer(
allTables.Where(t => !t.IsIndependent).DistinctBy(n => n.Name),
allTables.Where(t => !t.IsIndependent).DistinctBy(n => n.Name),
allTables.Where(t => t.IsIndependent),
_logger);

var emptyStringFlag = _config.ToEnum(Constants.Config.EmptyString);
var skipCreate = Convert.ToBoolean(_config[Constants.Config.SkipCreate]);

await Parallel.ForEachAsync(
tableSequencer.GetTablesReadyForCreationAsync(),
ct,
tableSequencer.GetTablesReadyForCreationAsync(),
ct,
async (node, _) =>
{
if (node.TableDefinition == null) throw new ArgumentNullException(nameof(node.TableDefinition));
if (!await CreateTableAsync(
folder,
node.TableDefinition,
emptyStringFlag,
if (!await CreateAndInsertAsync(
folder,
node.TableDefinition,
emptyStringFlag,
skipCreate,
ct)
.ConfigureAwait(false))
{
Interlocked.Increment(ref errors);
}
tableSequencer.TableFinished(node);
}).ConfigureAwait(false);

Expand All @@ -103,87 +106,138 @@ public async Task RunAsync(Rdbms rdbms, CancellationToken ct)
Console.WriteLine(
$"Creating and filling {schemaFiles.Count} {"table".Plural(schemaFiles.Count)} finished.");
}

sw.Stop();
_logger.Information("The total CopyIn operation took {Elapsed}", sw.Elapsed.ToString("g"));
Console.WriteLine($"The total CopyIn operation took {sw.Elapsed:g}");
}

private async Task<bool> CreateTableAsync(string folder,
private async Task<bool> CreateAndInsertAsync(
string folder,
TableDefinition tableDefinition,
EmptyStringFlag emptyStringFlag,
EmptyStringFlag emptyStringFlag,
bool skipCreate,
CancellationToken ct)
{
IADataReader? dataReader = null;
var errorOccurred = false;
try
if (!skipCreate)
{
try
if (!await RecreateTableAsync(tableDefinition, ct)
.ConfigureAwait(false))
{
await _pgCmd.DropTableAsync(tableDefinition.GetNameTuple(), ct).ConfigureAwait(false);
await _pgCmd.CreateTableAsync(tableDefinition, ct).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.Error(ex, "Couldn't create table '{TableName}'",
tableDefinition.GetFullName());
Console.WriteLine($"Couldn't create table '{tableDefinition.GetFullName()}'");
return false;
}
}

try
{
dataReader = _aDataReaderFactory.Get(tableDefinition.Rdbms);
var rows = await dataReader.ReadAsync(folder, tableDefinition, ct, emptyStringFlag).ConfigureAwait(false);
Console.WriteLine($"Read {rows} {"row".Plural(rows)} for table '{tableDefinition.GetFullName()}'");
_logger.Information($"Read {{Rows}} {"row".Plural(rows)} for table '{{TableName}}'",
rows, tableDefinition.GetFullName());
}
catch (Exception ex)
{
_logger.Error(ex, "Read data for table '{TableName}' failed",
tableDefinition.GetFullName());
Console.WriteLine($"Read data for table '{tableDefinition.GetFullName()}' failed");
return false;
}
if (!await InsertDataAsync(folder, tableDefinition, emptyStringFlag, ct)
.ConfigureAwait(false))
{
return false;
}

foreach (var columnName in tableDefinition.Columns
.Where(c => c.Identity != null)
.Select(c => c.Name))
{
try
{
await _systemTables.ResetIdentityAsync(tableDefinition.GetFullName(), columnName, ct).ConfigureAwait(false);
_logger.Information("Reset auto generation for {TableName}.{ColumnName}",
tableDefinition.GetFullName(), columnName);
}
catch (Exception ex)
{
_logger.Error(ex, "Reset auto generation for {TableName}.{ColumnName} failed",
tableDefinition.GetFullName(), columnName);
Console.WriteLine($"**ERROR**: Reset auto generation for {tableDefinition.GetFullName()}.{columnName} failed after all rows where inserted. This is a serious error! The auto generation will most likely produce duplicates.");
errorOccurred = true;
}
}
var isIdentityColumnsOk = await ResetIdentityColumnsAsync(tableDefinition, ct)
.ConfigureAwait(false);

var isIndexesOk = true;
if (!skipCreate)
{
isIndexesOk = await CreateIndexesAsync(tableDefinition, ct)
.ConfigureAwait(false);
}

return isIdentityColumnsOk && isIndexesOk;
}

private async Task<bool> RecreateTableAsync(
TableDefinition tableDefinition,
CancellationToken ct)
{
try
{
await _pgCmd.DropTableAsync(tableDefinition.GetNameTuple(), ct).ConfigureAwait(false);
await _pgCmd.CreateTableAsync(tableDefinition, ct).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.Error(ex, "Unexpected error for table '{TableName}'",
_logger.Error(ex, "Couldn't create table '{TableName}'",
tableDefinition.GetFullName());
Console.WriteLine($"Unexpected error for table '{tableDefinition.GetFullName()}'");
Console.WriteLine($"Couldn't create table '{tableDefinition.GetFullName()}'");
return false;
}

return true;
}

private async Task<bool> InsertDataAsync(
string folder,
TableDefinition tableDefinition,
EmptyStringFlag emptyStringFlag,
CancellationToken ct)
{
IADataReader? dataReader = null;
try
{
dataReader = _aDataReaderFactory.Get(tableDefinition.Rdbms);
var rows = await dataReader.ReadAsync(folder, tableDefinition, ct, emptyStringFlag).ConfigureAwait(false);
Console.WriteLine($"Read {rows} {"row".Plural(rows)} for table '{tableDefinition.GetFullName()}'");
_logger.Information($"Read {{Rows}} {"row".Plural(rows)} for table '{{TableName}}'",
rows, tableDefinition.GetFullName());
}
catch (Exception ex)
{
_logger.Error(ex, "Read data for table '{TableName}' failed",
tableDefinition.GetFullName());
Console.WriteLine($"Read data for table '{tableDefinition.GetFullName()}' failed");
return false;
}
finally
{
dataReader?.Dispose();
}

return true;
}

private async Task<bool> ResetIdentityColumnsAsync(
TableDefinition tableDefinition,
CancellationToken ct)
{
foreach (var columnName in tableDefinition.Columns
.Where(c => c.Identity != null)
.Select(c => c.Name))
{
try
{
await _systemTables.ResetIdentityAsync(tableDefinition.GetFullName(), columnName, ct)
.ConfigureAwait(false);
_logger.Information("Reset auto generation for {TableName}.{ColumnName}",
tableDefinition.GetFullName(), columnName);
}
catch (Exception ex)
{
_logger.Error(ex, "Reset auto generation for {TableName}.{ColumnName} failed",
tableDefinition.GetFullName(), columnName);
Console.WriteLine(
$"**ERROR**: Reset auto generation for {tableDefinition.GetFullName()}.{columnName} failed after all rows where inserted. This is a serious error! The auto generation will most likely produce duplicates.");
return false;
}
}

return true;
}

private async Task<bool> CreateIndexesAsync(
TableDefinition tableDefinition,
CancellationToken ct)
{
var errorOccurred = false;
await Parallel.ForEachAsync(tableDefinition.Indexes, ct, async (indexDefinition, _) =>
{
try
{
_logger.Information("Creating index '{IndexName}' for table '{TableName}'...",
indexDefinition.Header.Name, tableDefinition.GetFullName());
await _pgCmd.CreateIndexAsync(tableDefinition.GetNameTuple(), indexDefinition, ct).ConfigureAwait(false);
await _pgCmd.CreateIndexAsync(tableDefinition.GetNameTuple(), indexDefinition, ct)
.ConfigureAwait(false);
Console.WriteLine(
$"Created index '{indexDefinition.Header.Name}' for table '{tableDefinition.GetFullName()}'");
_logger.Information("Created index '{IndexName}' for table '{TableName}'",
Expand All @@ -199,6 +253,6 @@ public async Task RunAsync(Rdbms rdbms, CancellationToken ct)
}
}).ConfigureAwait(false);

return !errorOccurred;
return errorOccurred;
}
}
1 change: 1 addition & 0 deletions src/ABulkCopy.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public struct Config
public const string SchemaFilter = "AppSettings:SchemaFilter";
public const string SearchFilter = "AppSettings:SearchFilter";
public const string EmptyString = "AppSettings:EmptyString";
public const string SkipCreate = "AppSettings:SkipCreate";

public const string PgKeywords = "PgKeywords";

Expand Down

0 comments on commit 5607180

Please sign in to comment.