Skip to content

Commit

Permalink
Merge pull request #8806 from ClickHouse/vzakaznikov-issue_7878_3
Browse files Browse the repository at this point in the history
Reworking fix for issue 7878 (version 4)
  • Loading branch information
alexey-milovidov committed Jan 25, 2020
2 parents a21d371 + 0b43bd5 commit 9f3bbea
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 26 deletions.
23 changes: 2 additions & 21 deletions dbms/src/DataStreams/ConvertingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Common/quoteString.h>
#include <Parsers/IAST.h>


namespace DB
{

Expand Down Expand Up @@ -64,18 +65,8 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
break;

case MatchColumnsMode::NameOrDefault:
if (input_header.has(res_elem.name))
conversion[result_col_num] = input_header.getPositionByName(res_elem.name);
else
conversion[result_col_num] = USE_DEFAULT;
break;
}

if (conversion[result_col_num] == USE_DEFAULT)
continue;

const auto & src_elem = input_header.getByPosition(conversion[result_col_num]);

/// Check constants.
Expand Down Expand Up @@ -109,17 +100,8 @@ Block ConvertingBlockInputStream::readImpl()
Block res = header.cloneEmpty();
for (size_t res_pos = 0, size = conversion.size(); res_pos < size; ++res_pos)
{
auto & res_elem = res.getByPosition(res_pos);

if (conversion[res_pos] == USE_DEFAULT)
{
// Create a column with default values
auto column_with_defaults = res_elem.type->createColumn()->cloneResized(src.rows());
res_elem.column = std::move(column_with_defaults);
continue;
}

const auto & src_elem = src.getByPosition(conversion[res_pos]);
auto & res_elem = res.getByPosition(res_pos);

ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem, context);

Expand All @@ -132,4 +114,3 @@ Block ConvertingBlockInputStream::readImpl()
}

}

5 changes: 1 addition & 4 deletions dbms/src/DataStreams/ConvertingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ class ConvertingBlockInputStream : public IBlockInputStream
/// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names.
Position,
/// Find columns in source by their names. Allow excessive columns in source.
Name,
/// Find columns in source by their names if present else use the default. Allow excessive columns in source.
NameOrDefault
Name
};

ConvertingBlockInputStream(
Expand All @@ -50,7 +48,6 @@ class ConvertingBlockInputStream : public IBlockInputStream

/// How to construct result block. Position in source block, where to get each column.
using Conversion = std::vector<size_t>;
const size_t USE_DEFAULT = static_cast<size_t>(-1);
Conversion conversion;
};

Expand Down
17 changes: 16 additions & 1 deletion dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,24 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
query = materialized_view->getInnerQuery();

std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->database = inner_table_id.database_name;
insert->table = inner_table_id.table_name;

/// Get list of columns we get from select query.
auto header = InterpreterSelectQuery(query, *views_context, SelectQueryOptions().analyze())
.getSampleBlock();

/// Insert only columns returned by select.
auto list = std::make_shared<ASTExpressionList>();
for (auto & column : header)
/// But skip columns which storage doesn't have.
if (inner_table->hasColumn(column.name))
list->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));

insert->columns = std::move(list);

ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, *views_context);
BlockIO io = interpreter.execute();
Expand Down Expand Up @@ -230,7 +245,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::NameOrDefault);
in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
}
else
in = std::make_shared<OneBlockInputStream>(block);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-- Create dictionary, since dictGet*() uses DB::Context in executeImpl()
-- (To cover scope of the Context in DB::PushingToViewsBlockOutputStream::process)
DROP TABLE IF EXISTS mv;
DROP DATABASE IF EXISTS dict_in_01023;
CREATE DATABASE dict_in_01023;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS mv;
DROP TABLE IF EXISTS mv_source;
DROP TABLE IF EXISTS mv_target;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
1
1
2
3
1 2
1 2
2 3
3 4
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
DROP TABLE IF EXISTS mv;
DROP TABLE IF EXISTS mv_source;
DROP TABLE IF EXISTS mv_target;

CREATE TABLE mv_source (`a` UInt64) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE mv_target (`a` UInt64) ENGINE = MergeTree ORDER BY tuple();

CREATE MATERIALIZED VIEW mv TO mv_target AS SELECT * FROM mv_source;

INSERT INTO mv_source VALUES (1);

ALTER TABLE mv_target ADD COLUMN b UInt8 DEFAULT a + 1;
INSERT INTO mv_source VALUES (1),(2),(3);

SELECT * FROM mv ORDER BY a;
SELECT * FROM mv_target ORDER BY a;

0 comments on commit 9f3bbea

Please sign in to comment.