| Title: | Query Data in 'Microsoft Fabric' |
|---|---|
| Description: | Query data hosted in 'Microsoft Fabric'. Provides helpers to open 'DBI' connections to 'SQL' endpoints of 'Lakehouse' and 'Data Warehouse' items; submit 'Data Analysis Expressions' ('DAX') queries to semantic model datasets in 'Microsoft Fabric' and 'Power BI'; read 'Delta Lake' tables stored in 'OneLake' ('Azure Data Lake Storage Gen2'); and execute 'Spark' code via the 'Livy API'. |
| Authors: | Luka Koning [aut, cre, cph], Kennispunt Twente [fnd] |
| Maintainer: | Luka Koning <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 0.2.1.9000 |
| Built: | 2026-06-03 11:23:20 UTC |
| Source: | https://github.com/kennispunttwente/fabricqueryr |
High-level helper that creates a Livy session in Microsoft Fabric, waits for it to become idle, submits a statement with Spark code for execution, retrieves the result, and closes the session.
fabric_livy_query( livy_url, code, kind = c("spark", "pyspark", "sparkr", "sql"), tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), access_token = NULL, environment_id = NULL, conf = NULL, verbose = TRUE, poll_interval = 2L, timeout = 600L )fabric_livy_query( livy_url, code, kind = c("spark", "pyspark", "sparkr", "sql"), tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), access_token = NULL, environment_id = NULL, conf = NULL, verbose = TRUE, poll_interval = 2L, timeout = 600L )
livy_url |
Character. Livy session job connection string, e.g.
|
code |
Character. Code to run in the Livy session. |
kind |
Character. One of |
tenant_id |
Microsoft Azure tenant ID. Defaults to |
client_id |
Microsoft Azure application (client) ID used to authenticate. Defaults to
|
access_token |
Optional character. If supplied, use this bearer token
instead of acquiring a new one via |
environment_id |
Optional character. Fabric Environment (pool) ID to use
for the session. If |
conf |
Optional list. Spark configuration settings to apply to the session. |
verbose |
Logical. Emit progress via |
poll_interval |
Integer. Polling interval in seconds when waiting for session/statement readiness. |
timeout |
Integer. Timeout in seconds when waiting for session/statement readiness. |
In Microsoft Fabric, you can find and copy the Livy session URL by going to a 'Lakehouse' item, then go to 'Settings' -> 'Livy Endpoint' -> 'Session job connection string'.
By default we request a token for https://api.fabric.microsoft.com/.default.
AzureAuth is used to acquire the token. Be wary of
caching behavior; you may want to call AzureAuth::clean_token_directory()
to clear cached tokens if you run into issues
A list with statement details and results. The list contains:
id: Statement ID.
state: Final statement state (should be "available").
started_local: Local timestamp when statement started running.
completed_local: Local timestamp when statement completed.
duration_sec: Duration in seconds (local).
output: A list with raw output details:
status: Output status (e.g., "ok").
execution_count: Execution count (if applicable). The number of
statements that have been executed in the session.
data: Raw data list with MIME types as keys (e.g.
"text/plain", "application/json").
parsed: Parsed output, if possible. This may be a data frame (tibble)
if the output was JSON tabular data, or a character vector if it was
plain text. May be NULL if parsing was not possible.
url: URL of the statement resource in the Livy API.
Livy API overview - Microsoft Fabric - 'What is the Livy API for Data Engineering?'; Livy Docs - REST API.
# Find your session URL in Fabric by going to a 'Lakehouse' item, # then go to 'Settings' -> 'Livy Endpoint' -> 'Session job connection string' sess_url <- "https://api.fabric.microsoft.com/v1/workspaces/.../lakehouses/.../livyapi/..." # Livy API can run SQL, SparkR, PySpark, & Spark # Below are examples of 1) SQL & 2) SparkR usage # Example is not executed since it requires configured credentials for Fabric ## Not run: ## 1 Livy & SQL # Here we run SQL remotely in Microsoft Fabric with Spark, to get data to local R # Since Livy API cannot directly give back a proper DF, we build it from returned schema & matrix # Run Livy SQL query livy_sql_result <- fabric_livy_query( livy_url = sess_url, kind = "sql", code = "SELECT * FROM Patienten LIMIT 1000", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) # '$schema$fields' contains column info, & '$data' contains data as matrix without column names payload <- livy_sql_result$output$data[["application/json"]] schema <- as_tibble(payload$schema$fields) # has columns: name, type, nullable col_nms <- schema$name # Build dataframe (tibble) from the Livy result df_livy_sql <- payload$data |> as_tibble(.name_repair = "minimal") |> set_names(col_nms) |> mutate( # cast by schema$type (add more cases if your schema includes them) across(all_of(schema$name[schema$type == "long"]), readr::parse_integer), across(all_of(schema$name[schema$type == "double"]), readr::parse_double), across(all_of(schema$name[schema$type == "boolean"]), readr::parse_logical), across(all_of(schema$name[schema$type == "string"]), as.character) ) ## 2 Livy & SparkR # Here we run R code remotely in Microsoft Fabric with SparkR, to get data to local R # Since Livy API cannot directly give back a proper DF, we encode/decode B64 in SparkR/local R # Run Livy SparkR query livy_sparkr_result <- fabric_livy_query( livy_url = sess_url, kind = "sparkr", code = paste( # Obtain data in remote R (SparkR) 'library(SparkR); library(base64enc)', 'df <- sql("SELECT * FROM Patienten") |> limit(1000L) |> collect()', # serialize -> gzip -> base64 'r_raw <- serialize(df, connection = NULL)', 'raw_gz <- memCompress(r_raw, type = "gzip")', 'b64 <- base64enc::base64encode(raw_gz)', # output marked B64 string 'cat("<<B64RDS>>", b64, "<<END>>", sep = "")', sep = "\n" ), tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) # Extract marked B64 string from Livy output txt <- livy_sparkr_result$output$data$`text/plain` b64 <- sub('.*<<B64RDS>>', '', txt) b64 <- sub('<<END>>.*', '', b64) # Decode to dataframe raw_gz <- base64enc::base64decode(b64) r_raw <- memDecompress(raw_gz, type = "gzip") df_livy_sparkr <- unserialize(r_raw) ## End(Not run)# Find your session URL in Fabric by going to a 'Lakehouse' item, # then go to 'Settings' -> 'Livy Endpoint' -> 'Session job connection string' sess_url <- "https://api.fabric.microsoft.com/v1/workspaces/.../lakehouses/.../livyapi/..." # Livy API can run SQL, SparkR, PySpark, & Spark # Below are examples of 1) SQL & 2) SparkR usage # Example is not executed since it requires configured credentials for Fabric ## Not run: ## 1 Livy & SQL # Here we run SQL remotely in Microsoft Fabric with Spark, to get data to local R # Since Livy API cannot directly give back a proper DF, we build it from returned schema & matrix # Run Livy SQL query livy_sql_result <- fabric_livy_query( livy_url = sess_url, kind = "sql", code = "SELECT * FROM Patienten LIMIT 1000", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) # '$schema$fields' contains column info, & '$data' contains data as matrix without column names payload <- livy_sql_result$output$data[["application/json"]] schema <- as_tibble(payload$schema$fields) # has columns: name, type, nullable col_nms <- schema$name # Build dataframe (tibble) from the Livy result df_livy_sql <- payload$data |> as_tibble(.name_repair = "minimal") |> set_names(col_nms) |> mutate( # cast by schema$type (add more cases if your schema includes them) across(all_of(schema$name[schema$type == "long"]), readr::parse_integer), across(all_of(schema$name[schema$type == "double"]), readr::parse_double), across(all_of(schema$name[schema$type == "boolean"]), readr::parse_logical), across(all_of(schema$name[schema$type == "string"]), as.character) ) ## 2 Livy & SparkR # Here we run R code remotely in Microsoft Fabric with SparkR, to get data to local R # Since Livy API cannot directly give back a proper DF, we encode/decode B64 in SparkR/local R # Run Livy SparkR query livy_sparkr_result <- fabric_livy_query( livy_url = sess_url, kind = "sparkr", code = paste( # Obtain data in remote R (SparkR) 'library(SparkR); library(base64enc)', 'df <- sql("SELECT * FROM Patienten") |> limit(1000L) |> collect()', # serialize -> gzip -> base64 'r_raw <- serialize(df, connection = NULL)', 'raw_gz <- memCompress(r_raw, type = "gzip")', 'b64 <- base64enc::base64encode(raw_gz)', # output marked B64 string 'cat("<<B64RDS>>", b64, "<<END>>", sep = "")', sep = "\n" ), tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) # Extract marked B64 string from Livy output txt <- livy_sparkr_result$output$data$`text/plain` b64 <- sub('.*<<B64RDS>>', '', txt) b64 <- sub('<<END>>.*', '', b64) # Decode to dataframe raw_gz <- base64enc::base64decode(b64) r_raw <- memDecompress(raw_gz, type = "gzip") df_livy_sparkr <- unserialize(r_raw) ## End(Not run)
Authenticates to OneLake (ADLS Gen2), resolves the table's
_delta_log to determine the current active Parquet parts,
downloads only those parts to a local staging directory, and
returns the result as a tibble.
fabric_onelake_read_delta_table( table_path, workspace_name, lakehouse_name, schema = NULL, tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), dest_dir = NULL, verbose = TRUE, dfs_base = "https://onelake.dfs.fabric.microsoft.com" )fabric_onelake_read_delta_table( table_path, workspace_name, lakehouse_name, schema = NULL, tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), dest_dir = NULL, verbose = TRUE, dfs_base = "https://onelake.dfs.fabric.microsoft.com" )
table_path |
Character. Table name or nested path (e.g.
|
workspace_name |
Character. Fabric workspace display name or GUID (this is the ADLS filesystem/container name). |
lakehouse_name |
Character. Lakehouse item name, with or without the
|
schema |
Character or |
tenant_id |
Character. Entra ID (Azure AD) tenant GUID. Defaults to
|
client_id |
Character. App registration (client) ID. Defaults to
|
dest_dir |
Character or |
verbose |
Logical. Print progress messages via |
dfs_base |
Character. OneLake DFS endpoint. Default
|
In Microsoft Fabric, OneLake exposes each workspace as an ADLS Gen2
filesystem. Within a Lakehouse item, Delta tables are stored under
Tables/<table> (non-schema lakehouse) or Tables/<schema>/<table>
(schema-enabled lakehouse) with a _delta_log/ directory that tracks
commit state. This helper replays the JSON commits to avoid
double-counting compacted/removed files.
Schema-enabled lakehouses (the default for new lakehouses) organise
tables into named schemas. Supply the schema argument (e.g. "dbo")
to read a table stored under a specific schema.
Ensure the account/principal you authenticate with has access via Lakehouse -> Manage OneLake data access (or is a member of the workspace).
AzureAuth is used to acquire the token. Be wary of
caching behavior; you may want to call AzureAuth::clean_token_directory()
to clear cached tokens if you run into issues
A tibble with the table's current rows (0 rows if the table is empty).
# Example is not executed since it requires configured credentials for Fabric ## Not run: df <- fabric_onelake_read_delta_table( table_path = "Patients/PatientInfo", workspace_name = "PatientsWorkspace", lakehouse_name = "Lakehouse.Lakehouse", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) dplyr::glimpse(df) # Schema-enabled lakehouse: read from Tables/dbo/PatientInfo df2 <- fabric_onelake_read_delta_table( table_path = "PatientInfo", workspace_name = "PatientsWorkspace", lakehouse_name = "Lakehouse.Lakehouse", schema = "dbo" ) ## End(Not run)# Example is not executed since it requires configured credentials for Fabric ## Not run: df <- fabric_onelake_read_delta_table( table_path = "Patients/PatientInfo", workspace_name = "PatientsWorkspace", lakehouse_name = "Lakehouse.Lakehouse", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) dplyr::glimpse(df) # Schema-enabled lakehouse: read from Tables/dbo/PatientInfo df2 <- fabric_onelake_read_delta_table( table_path = "PatientInfo", workspace_name = "PatientsWorkspace", lakehouse_name = "Lakehouse.Lakehouse", schema = "dbo" ) ## End(Not run)
High-level helper that authenticates against Azure AD, resolves the workspace & dataset from a Power BI (Microsoft Fabric) XMLA/connection string, executes a DAX statement via the Power BI REST API, and returns a tibble with the resulting data.
fabric_pbi_dax_query( connstr, dax, tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), include_nulls = TRUE, api_base = "https://api.powerbi.com/v1.0/myorg" )fabric_pbi_dax_query( connstr, dax, tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), include_nulls = TRUE, api_base = "https://api.powerbi.com/v1.0/myorg" )
connstr |
Character. Power BI connection string, e.g.
|
dax |
Character scalar with a valid DAX query (see example). |
tenant_id |
Microsoft Azure tenant ID. Defaults to |
client_id |
Microsoft Azure application (client) ID used to authenticate. Defaults to
|
include_nulls |
Logical; pass-through to the REST serializer setting. Defaults to TRUE. If TRUE, null values are included in the response; if FALSE, they are omitted. |
api_base |
API base URL. Defaults to "https://api.powerbi.com/v1.0/myorg". 'myorg' is appropriate for most use cases and does not necessarily need to be changed. |
In Microsoft Fabric/Power BI, you can find and copy the connection string by going to a 'Semantic model' item, then go to 'File' -> 'Settings' -> 'Server settings'. Ensure that the account you use to authenticate has access to the workspace, or has been granted 'Build' permissions on the dataset (via sharing).
AzureAuth is used to acquire the token. Be wary of
caching behavior; you may want to call AzureAuth::clean_token_directory()
to clear cached tokens if you run into issues
A tibble with the query result (0 rows if the DAX query returned no rows).
# Example is not executed since it requires configured credentials for Fabric ## Not run: conn <- "Data Source=powerbi://api.powerbi.com/v1.0/myorg/My Workspace;Initial Catalog=SalesModel;" df <- fabric_pbi_dax_query( connstr = conn, dax = "EVALUATE TOPN(1000, 'Customers')", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) dplyr::glimpse(df) ## End(Not run)# Example is not executed since it requires configured credentials for Fabric ## Not run: conn <- "Data Source=powerbi://api.powerbi.com/v1.0/myorg/My Workspace;Initial Catalog=SalesModel;" df <- fabric_pbi_dax_query( connstr = conn, dax = "EVALUATE TOPN(1000, 'Customers')", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) dplyr::glimpse(df) ## End(Not run)
Opens a DBI/ODBC connection to a Microsoft Fabric Data Warehouse or Lakehouse SQL endpoint, authenticating with Azure AD (MSAL v2) and passing an access token to the ODBC driver.
fabric_sql_connect( server, database = "Lakehouse", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), access_token = NULL, odbc_driver = getOption("fabricqueryr.sql.driver", "ODBC Driver 18 for SQL Server"), port = 1433L, encrypt = "yes", trust_server_certificate = "no", timeout = 30L, verbose = TRUE, ... )fabric_sql_connect( server, database = "Lakehouse", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), access_token = NULL, odbc_driver = getOption("fabricqueryr.sql.driver", "ODBC Driver 18 for SQL Server"), port = 1433L, encrypt = "yes", trust_server_certificate = "no", timeout = 30L, verbose = TRUE, ... )
server |
Character. Microsoft Fabric SQL connection string or |
database |
Character. Database name. Defaults to |
tenant_id |
Character. Entra ID (AAD) tenant GUID. Defaults to
|
client_id |
Character. App registration (client) ID. Defaults to
|
access_token |
Optional character. If supplied, use this bearer token
instead of acquiring a new one via |
odbc_driver |
Character. ODBC driver name. Defaults to
|
port |
Integer. TCP port (default 1433). |
encrypt, trust_server_certificate
|
Character flags passed to ODBC.
Defaults |
timeout |
Integer. Login/connect timeout in seconds. Default 30. |
verbose |
Logical. Emit progress via |
... |
Additional arguments forwarded to |
server is the Microsoft Fabric SQL connection string, e.g.
"xxxx.datawarehouse.fabric.microsoft.com".
You can find this by going to your Lakehouse or Data Warehouse item,
then Settings -> SQL analytics endpoint -> SQL connection string.
You may also pass a DSN-less Server=... string; it will be normalized.
By default we request a token for
https://database.windows.net/.default.
AzureAuth is used to acquire the token. Be wary of
caching behavior; you may want to call AzureAuth::clean_token_directory()
to clear cached tokens if you run into issues
A live DBIConnection object.
# Example is not executed since it requires configured credentials for Fabric ## Not run: con <- fabric_sql_connect( server = "2gxz...qiy.datawarehouse.fabric.microsoft.com", database = "Lakehouse", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) # List databases DBI::dbGetQuery(con, "SELECT name FROM sys.databases") # List tables DBI::dbGetQuery(con, " SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ") # Get a table df <- DBI::dbReadTable(con, "Customers") dplyr::glimpse(df) DBI::dbDisconnect(con) ## End(Not run)# Example is not executed since it requires configured credentials for Fabric ## Not run: con <- fabric_sql_connect( server = "2gxz...qiy.datawarehouse.fabric.microsoft.com", database = "Lakehouse", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) # List databases DBI::dbGetQuery(con, "SELECT name FROM sys.databases") # List tables DBI::dbGetQuery(con, " SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ") # Get a table df <- DBI::dbReadTable(con, "Customers") dplyr::glimpse(df) DBI::dbDisconnect(con) ## End(Not run)
Convenience wrapper that opens a connection with
fabric_sql_connect(), executes sql, and returns a tibble. The
connection is closed on exit.
fabric_sql_query( server, sql, database = "Lakehouse", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), access_token = NULL, odbc_driver = getOption("fabricqueryr.sql.driver", "ODBC Driver 18 for SQL Server"), port = 1433L, encrypt = "yes", trust_server_certificate = "no", timeout = 30L, verbose = TRUE, ... )fabric_sql_query( server, sql, database = "Lakehouse", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID", unset = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"), access_token = NULL, odbc_driver = getOption("fabricqueryr.sql.driver", "ODBC Driver 18 for SQL Server"), port = 1433L, encrypt = "yes", trust_server_certificate = "no", timeout = 30L, verbose = TRUE, ... )
server |
Character. Microsoft Fabric SQL connection string or |
sql |
Character scalar. The SQL to run. |
database |
Character. Database name. Defaults to |
tenant_id |
Character. Entra ID (AAD) tenant GUID. Defaults to
|
client_id |
Character. App registration (client) ID. Defaults to
|
access_token |
Optional character. If supplied, use this bearer token
instead of acquiring a new one via |
odbc_driver |
Character. ODBC driver name. Defaults to
|
port |
Integer. TCP port (default 1433). |
encrypt, trust_server_certificate
|
Character flags passed to ODBC.
Defaults |
timeout |
Integer. Login/connect timeout in seconds. Default 30. |
verbose |
Logical. Emit progress via |
... |
Additional arguments forwarded to |
A tibble with the query results (0 rows if none).
# Example is not executed since it requires configured credentials for Fabric ## Not run: df <- fabric_sql_query( server = "2gxz...qiy.datawarehouse.fabric.microsoft.com", database = "Lakehouse", sql = "SELECT TOP 100 * FROM sys.objects", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) dplyr::glimpse(df) ## End(Not run)# Example is not executed since it requires configured credentials for Fabric ## Not run: df <- fabric_sql_query( server = "2gxz...qiy.datawarehouse.fabric.microsoft.com", database = "Lakehouse", sql = "SELECT TOP 100 * FROM sys.objects", tenant_id = Sys.getenv("FABRICQUERYR_TENANT_ID"), client_id = Sys.getenv("FABRICQUERYR_CLIENT_ID") ) dplyr::glimpse(df) ## End(Not run)