Skip to content

Commit

Permalink
Merge pull request #161 from rstudio/databricks
Browse files Browse the repository at this point in the history
Databricks
  • Loading branch information
edgararuiz committed Feb 26, 2024
2 parents 45672cf + 3435131 commit c5f0699
Show file tree
Hide file tree
Showing 18 changed files with 2,966 additions and 1,033 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"hash": "30206ffec739a951e316b6fdc628d679",
"result": {
"markdown": "---\ntitle: Run R inside Databricks Connect\nformat:\n html:\n theme: default\n toc: true\nexecute:\n eval: true \n freeze: true\neditor: \n markdown: \n wrap: 72\n---\n\n\n\n\n*Last updated: Mon Feb 26 13:17:50 2024*\n\n## Intro\n\nSupport for `spark_apply()` is currently available in the development\nversions of `sparklyr`, and `pysparklyr`. To install, run the following:\n\n``` r\nremotes::install_github(\"sparklyr/sparklyr\")\nremotes::install_github(\"mlverse/pysparklyr\")\n```\n\nDatabricks Connect is now able to run regular Python code inside Spark.\n`sparklyr` takes advantage of this capability by having Python transport\nand run the R code. It does this via the `rpy2` Python library. Using\nthis library also guarantees Arrow support.\n\n::: {#fig-connect}\n\n```{mermaid}\n%%| fig-width: 6\n%%| eval: true\nflowchart LR\n subgraph mm[My machine]\n sp[R <br> ********** <br>sparklyr]\n rp[Python<br> **************** <br>rpy2 'packages'<br> the R code]\n end\n subgraph db[Databricks]\n subgraph sr[Spark]\n pt[Python<br> ********************* <br>rpy2 runs the R code]\n end\n end\n\nsp --> rp\nrp --> sr\n\nstyle mm fill:#fff,stroke:#666,color:#000\nstyle sp fill:#fff,stroke:#666,color:#000\nstyle rp fill:#fff,stroke:#666,color:#000\nstyle db fill:#fff,stroke:#666,color:#000\nstyle sr fill:#fff,stroke:#666,color:#000\nstyle pt fill:#fff,stroke:#666,color:#000\n```\n\n\nHow `sparklyr` uses rpy2 to run R code in Databricks Connect\n:::\n\n## Getting started\n\nIf you have been using `sparklyr` with Databricks Connect v2 already,\nthen after upgrading the packages, you will be prompted to install\n`rpy2` in your Python environment. The prompt will occur the first time\nyou use `spark_apply()` in an interactive R session. If this is the\nfirst time you are using `sparklyr` with Databricks Connect v2, please\nrefer to our intro article[\"Databricks Connect\nv2\"](/deployment/databricks-connect.qmd) to learn how to setup your\nenvironment.\n\nAs shown in the diagram on the previous section, `rpy2` is needed on the\nDatabricks cluster you plan to use. This means that you will need to\n\"manually\" install the library in the cluster. This is a simple\noperation that is done via your Databricks web portal. Here are the\ninstructions that shows you how to do that: [Databricks - Cluster\nLibraries](https://docs.databricks.com/en/libraries/cluster-libraries.html).\n\n## What is supported in `spark_apply()` - At a glance\n\n| Argument | Supported? | Notes |\n|---------|---------|------------------------------------------------------|\n| `x` | Yes | |\n| `f` | Yes | |\n| `columns` | Yes | Requires a string entry that contains the name of the column and its Spark variable type. Accepted values are: `long`, `decimal`, `string`, `datetime` and `bool`. Example: `columns = \"x long, y string\"`. If not provided, `sparklyr` will automatically create one, by examining the first 10 records of `x`, and it will provide a `columns` spec you can use when running `spark_apply()` again. See: [Providing a schema](#providing-a-schema) |\n| `memory` | Yes | |\n| `group_by` | Yes | |\n| `packages` | **No** | You will need to pre-install the needed R packages in your cluster via the Databricks web portal, see [R packages](#r-packages) |\n| `context` | **No** | |\n| `name` | Yes | |\n| `barrier` | Yes | Support only on ungrouped data. In other words, it is valid when the `group_by` argument is used. |\n| `fetch_result_as_sdf` | Yes | At this time, `spark_apply()` inside Databricks Connect only supports rectangular data, so seeing to `FALSE` will always return a data frame. |\n| `partition_index_param` | **No** | |\n| `arrow_max_records_per_batch` | Yes | Support only on ungrouped data. In other words, it is valid when the `group_by` argument is used. |\n| `auto_deps` | **No** | |\n| `...` | | |\n\n## R packages {#r-packages}\n\nIf your `spark_apply()` call uses specific R packages, you will need to\npre-install those specific packages in your target cluster. This is a\nsimple operation, because you can do this via your Databricks web\nportal, please see [Databricks - Cluster\nLibraries](https://docs.databricks.com/en/libraries/cluster-libraries.html)\nto learn how to do this.\n\n::: callout-caution\n## Only CRAN packages supported\n\nThe Databricks cluster library interface is able to source packages from\nCRAN only. This means that packages installed from GitHub, or another\nalternative sources, will not be available.\n:::\n\n#### Additional background\n\nIn previous implementation, `spark_apply()` was able to easily copy the\nlocally installed R packages in order to ensure that your code will run\nin the cluster. This was possible because R, and RStudio, was running in\none of the matching servers in the Spark cluster. Because `sparklyr` is\nrunning on a remote machine, more likely a laptop, this is no longer an\noption. In the vast majority of cases, the remote machine will be on\ndifferent a Operating System than the cluster. Additionally,\ntransmitting the unpacked, compiled, R packages would take a long time\nover a broadband Internet connection.\n\n## Providing a schema {#providing-a-schema}\n\nSpark requires the schema of the data that your R code will output. It\nexpects the name, and type of each field. The python\n\nPassing a schema in `columns` will make`spark_apply()` run faster.\nBecause if not provided, `sparklyr` has to collect the first 10 rows,\nand run the R code in order to try and determine the names and types of\nyour resulting data set. As a convenience, `sparklyr` will output a\nmessage with the schema it used as the schema. If you are going to rerun\nyour `spark_apply()` command again, you can copy and paste the output of\nthe message to you code.\n\n\n::: {.cell}\n\n```{.r .cell-code}\nspark_apply(\n tbl_mtcars,\n nrow,\n group_by = \"am\"\n)\n#> To increase performance, use the following schema:\n#> columns = \"am double, x long\"\n#> # Source: table<sparklyr_tmp_table_74ef0c15_0631_43ce_b0e4_43f6fd6d3f7c> [2 x 2]\n#> # Database: spark_connection\n#> am x\n#> <dbl> <dbl>\n#> 1 0 19\n#> 2 1 13\n```\n:::\n\n\nPassing the `columns` argument, silences the message:\n\n\n::: {.cell}\n\n```{.r .cell-code}\nspark_apply(\n tbl_mtcars,\n nrow,\n group_by = \"am\", \n columns = \"am double, x long\"\n)\n#> # Source: table<sparklyr_tmp_table_fb407bf9_94ac_4b57_9426_37ad2f56b9e4> [2 x 2]\n#> # Database: spark_connection\n#> am x\n#> <dbl> <dbl>\n#> 1 0 19\n#> 2 1 13\n```\n:::\n\n\n## Partition data\n\nTypically, with un-grouped data, the number of parallel jobs will\ncorrespond with the number of partitions of the data. For Databricks\nconnections, `sparklyr` will, by default, attempt to use Apache Arrow.\nThe Databricks Connect clusters come with Arrow installed. This approach\nalso changes how Spark will partition your data. Instead of the number\nof partitions, Spark will use the value in the \"Arrow Max Records per\nBach\" option. This option can be controlled directly in the\n`spark_apply()` call by setting the `arrow_max_records_per_batch`.\n\n\n::: {.cell}\n\n```{.r .cell-code}\nspark_apply(tbl_mtcars, nrow, arrow_max_records_per_batch = 4, columns = \"x long\")\n#> Changing spark.sql.execution.arrow.maxRecordsPerBatch to: 4\n#> # Source: table<sparklyr_tmp_table_9cdf51c8_0af6_40c0_ba78_49f1fa696224> [8 x 1]\n#> # Database: spark_connection\n#> x\n#> <dbl>\n#> 1 4\n#> 2 4\n#> 3 4\n#> 4 4\n#> 5 4\n#> 6 4\n#> 7 4\n#> 8 4\n```\n:::\n\n\nIf you pass a different Arrow Batch size than what the option is set to\ncurrently, `sparklyr` will change the value of that option, and will\nnotify you of that:\n\n\n::: {.cell}\n\n```{.r .cell-code}\nspark_apply(tbl_mtcars, nrow, arrow_max_records_per_batch = 2, columns = \"x long\")\n#> Changing spark.sql.execution.arrow.maxRecordsPerBatch to: 2\n#> # Source: table<sparklyr_tmp_table_b81ab0ff_ac22_4a1e_9ab7_22e4961525fd> [?? x 1]\n#> # Database: spark_connection\n#> x\n#> <dbl>\n#> 1 2\n#> 2 2\n#> 3 2\n#> 4 2\n#> 5 2\n#> 6 2\n#> 7 2\n#> 8 2\n#> 9 2\n#> 10 2\n#> # ℹ more rows\n```\n:::\n\n\n## Limitations\n\n`spark_apply()` **will only work on Databricks \"Single Access\" mode**.\n\"Shared Access\" mode does not currently support `mapInPandas()`, and\n`applyInPandas()` (see [Databricks - Access mode\nlimitations](https://docs.databricks.com/en/compute/access-mode-limitations.html#udf-limitations-for-unity-catalog-shared-access-mode)).\nThese are the Python functions that `sparklyr` uses to run the Python\ncode, which in turn runs the R code via `rpy2`.\n",
"supporting": [],
"filters": [
"rmarkdown/pagebreak.lua"
],
"includes": {},
"engineDependencies": {},
"preserve": {},
"postProcess": true
}
}

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions _quarto.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,14 @@ website:
href: deployment/yarn-cluster-emr.qmd
- text: Cloudera cluster
href: deployment/cloudera-aws.qmd
- section: Databricks clusters
- section: Databricks Connect (v2)
contents:
- text: Databricks Connect v2 (DBR 13+)
- text: Getting Started
href: deployment/databricks-connect.qmd
- text: Run R code in Databricks
href: deployment/databricks-connect-udfs.qmd
- text: Deploying to Posit Connect
href: deployment/databricks-posit-connect.qmd
- section: Stand Alone Clusters
contents:
- text: Qubole cluster
Expand Down
Loading

0 comments on commit c5f0699

Please sign in to comment.