Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing refresh materialized view statement #97

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] name
AS <query>
WITH ( options )

REFRESH MATERIALIZED VIEW name

SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database]

[DESC|DESCRIBE] MATERIALIZED VIEW name
Expand All @@ -213,6 +215,8 @@ SELECT
FROM alb_logs
GROUP BY TUMBLE(time, '1 Minute')

REFRESH MATERIALIZED VIEW alb_logs_metrics

SHOW MATERIALIZED VIEWS IN spark_catalog.default

DESC MATERIALIZED VIEW alb_logs_metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
Expand All @@ -90,6 +91,10 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshMaterializedViewStatement
: REFRESH MATERIALIZED VIEW mvName=multipartIdentifier
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types.StringType

Expand All @@ -25,7 +26,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
self: SparkSqlAstBuilder =>

override def visitCreateMaterializedViewStatement(
ctx: CreateMaterializedViewStatementContext): AnyRef = {
ctx: CreateMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val mvName = getFullTableName(flint, ctx.mvName)
val query = getMvQuery(ctx.query)
Expand All @@ -50,8 +51,17 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}
}

override def visitRefreshMaterializedViewStatement(
ctx: RefreshMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName, RefreshMode.FULL)
Seq.empty
}
}

override def visitShowMaterializedViewStatement(
ctx: ShowMaterializedViewStatementContext): AnyRef = {
ctx: ShowMaterializedViewStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("materialized_view_name", StringType, nullable = false)())

Expand All @@ -67,7 +77,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}

override def visitDescribeMaterializedViewStatement(
ctx: DescribeMaterializedViewStatementContext): AnyRef = {
ctx: DescribeMaterializedViewStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("output_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)())
Expand All @@ -86,7 +96,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}

override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): AnyRef = {
ctx: DropMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
flint.deleteIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
|""".stripMargin)
}

// TODO: fix this windowing function unable to be used in GROUP BY
ignore("full refresh materialized view") {
test("full refresh materialized view") {
flint
.materializedView()
.name(testMvName)
Expand All @@ -104,12 +103,12 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

val indexData = flint.queryIndex(testFlintIndex)
checkAnswer(
indexData,
indexData.select("startTime", "count"),
Seq(
Row(timestamp("2023-10-01 00:00:00"), 1),
Row(timestamp("2023-10-01 00:10:00"), 2),
Row(timestamp("2023-10-01 01:00:00"), 1),
Row(timestamp("2023-10-01 02:00:00"), 1)))
Row(timestamp("2023-10-01 03:00:00"), 1)))
}

test("incremental refresh materialized view") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE

class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -128,6 +129,23 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create materialized view with manual refresh") {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = false
| )
|""".stripMargin)

val indexData = spark.read.format(FLINT_DATASOURCE).load(testFlintIndex)
flint.describeIndex(testFlintIndex) shouldBe defined
indexData.count() shouldBe 0

sql(s"REFRESH MATERIALIZED VIEW $testMvName")
indexData.count() shouldBe 4
}

test("create materialized view if not exists") {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
flint.describeIndex(testFlintIndex) shouldBe defined
Expand Down
Loading