Pipeable SQL

John Mount

2022-02-28

Introduction

rquery is a query generator for R. It is based on Edgar F. Codd’s relational algebra plus experience using SQL and dplyr at big data scale. The design represents an attempt to make SQL more teachable by denoting composition by a sequential pipeline notation instead of nested queries or functions. The implementation delivers reliable high performance data processing on large data systems such as Spark and databases. Package features include: data processing trees or pipelines as observable objects (able to report both columns produced and columns used), optimized SQL generation as an explicit user visible modeling step, convenience methods for applying query trees to in-memory data.frames, and low direct package dependencies.

Pipeable SQL

SQL is a very powerful data processing (or data engineering) grammar. Data scientists are well advised to learn to work with SQL.

An inessential difficulty in using SQL is SQL represents composition of operations by nesting, which can rapidly become confusing and illegible. This can be overcome by using a query composer such as rquery (some more query composers are listed here).

Let’s set up our environment so we can work with examples.

run_vignette <- requireNamespace("DBI", quietly = TRUE) && 
  requireNamespace("RSQLite", quietly = TRUE)
library("rquery")
library("wrapr")

# example database connection
db <- DBI::dbConnect(RSQLite::SQLite(),
                     ":memory:")
RSQLite::initExtension(db)

dbopts <- rq_connection_tests(db)
print(dbopts)
## $rquery.SQLiteConnection.use_DBI_dbListFields
## [1] TRUE
## 
## $rquery.SQLiteConnection.use_DBI_dbRemoveTable
## [1] TRUE
## 
## $rquery.SQLiteConnection.use_DBI_dbExecute
## [1] TRUE
## 
## $rquery.SQLiteConnection.create_temporary
## [1] FALSE
## 
## $rquery.SQLiteConnection.control_temporary
## [1] TRUE
## 
## $rquery.SQLiteConnection.control_temporary_view
## [1] FALSE
## 
## $rquery.SQLiteConnection.control_rownames
## [1] TRUE
## 
## $rquery.SQLiteConnection.use_DBI_dbExistsTable
## [1] TRUE
## 
## $rquery.SQLiteConnection.check_logical_column_types
## [1] TRUE
## 
## $rquery.SQLiteConnection.use_DROP_TABLE_IF_EXISTS
## [1] TRUE
## 
## $rquery.SQLiteConnection.expr_map
## $rquery.SQLiteConnection.expr_map$MOD
## $rquery.SQLiteConnection.expr_map$MOD[[1]]
## [1] "("
## 
## $rquery.SQLiteConnection.expr_map$MOD[[2]]
## [1] 3
## 
## $rquery.SQLiteConnection.expr_map$MOD[[3]]
## [1] "%"
## 
## $rquery.SQLiteConnection.expr_map$MOD[[4]]
## [1] 5
## 
## $rquery.SQLiteConnection.expr_map$MOD[[5]]
## [1] ")"
## 
## 
## $rquery.SQLiteConnection.expr_map$rand
## $rquery.SQLiteConnection.expr_map$rand[[1]]
## [1] "ABS"
## 
## $rquery.SQLiteConnection.expr_map$rand[[2]]
## [1] "("
## 
## $rquery.SQLiteConnection.expr_map$rand[[3]]
## [1] "("
## 
## $rquery.SQLiteConnection.expr_map$rand[[4]]
## [1] "RANDOM"
## 
## $rquery.SQLiteConnection.expr_map$rand[[5]]
## [1] "("
## 
## $rquery.SQLiteConnection.expr_map$rand[[6]]
## [1] ")"
## 
## $rquery.SQLiteConnection.expr_map$rand[[7]]
## [1] "%"
## 
## $rquery.SQLiteConnection.expr_map$rand[[8]]
## [1] "268435456"
## 
## $rquery.SQLiteConnection.expr_map$rand[[9]]
## [1] ")"
## 
## $rquery.SQLiteConnection.expr_map$rand[[10]]
## [1] "/"
## 
## $rquery.SQLiteConnection.expr_map$rand[[11]]
## [1] "268435455.0"
## 
## $rquery.SQLiteConnection.expr_map$rand[[12]]
## [1] ")"
options(dbopts)

# copy in example data
rq_copy_to(
  db, 'd',
  data.frame(v = c(1, -5, 3)),
  temporary = FALSE,
  overwrite = TRUE)
## [1] "mk_td(\"d\", c( \"v\"))"
##    v
## 1  1
## 2 -5
## 3  3
# produce a hande to existing table
d <- db_td(db, "d")

d is a “table description” which is just the name of a table and the names of expected columns. d does not store data or a database reference (making it safe to serialize/de-serialize). All rquery operation trees or pipelines must start either with a table description or a data.frame. We will discuss table descriptions later.

Note: in examples we use rq_copy_to() to create data. This is only for the purpose of having easy portable examples. With big data the data is usually already in the remote database or Spark system. The task is almost always to connect and work with this pre-existing remote data and the method to do this is db_td(), which builds a reference to a remote table given the table name. The suggested pattern for working with remote tables is to get inputs via db_td() and land remote results with materialze(). To work with local data one can copy data from memory to the database with rq_copy_to() and bring back results with execute() (though be aware operation on remote non-memory data is rquery’s primary intent).

For our first example we will introduce a new column and perform a calculation using this column. This is achieved in SQL by writing code in one of two styles: defining the first new column twice (once to land the value and once to use), or sequencing two queries by nesting. We will demonstrate both methods.

The define the column twice solution looks like the following.

DBI::dbGetQuery(db, "
  SELECT
    *,
    ABS(v) AS absv,
    ABS(v) - v AS delta
  FROM
    d
")
##    v absv delta
## 1  1    1     0
## 2 -5    5    10
## 3  3    3     0

In SQL the column absv is not available for calculation in the same query that it is produced.

The nested method looks like the following, we produce the column absv in one query and then wrap that in another query to later use the column. For expressions longer than ABS(v) this is the preferred solution (until one moves to something like common table expressions).

DBI::dbGetQuery(db, "
  SELECT
    *,
    absv - v AS delta
  FROM (
    SELECT
      *,
      ABS(v) AS absv
    FROM
      d
  ) subtab
")
##    v absv delta
## 1  1    1     0
## 2 -5    5    10
## 3  3    3     0

sql_node()

Using rquery we can write the SQL composition using pipe notation (where composition is written as x %.>% f %.>% g instead of g(f(x))). We are going to use wrapr dot-pipe instead of the magrittr pipe to pick up a neat feature we will use later (all other examples will work with the magrittr pipe). The “%.>%” glyph can be bound to a keyboard shortcut for convenience.

The rquery realization of the above calculation is as follows:

op_tree <- d %.>%
  sql_node(., "absv" := "ABS(v)") %.>%
  sql_node(., "delta" := "absv - v")
execute(db, op_tree)
##   absv delta  v
## 1    1     0  1
## 2    5    10 -5
## 3    3     0  3

The above is what we call “piped SQL” and represents a major convenience for users as the details of how to compose the statements are left to the package. The sql_node() is a very powerful node. We will use it in our first few examples and move onto more convenient higher level relational nodes.

We can view the SQL translation of the operations tree as follows:

cat(to_sql(op_tree, db))
SELECT
 `absv` AS `absv`,
 absv - v AS `delta`,
 `v` AS `v`
FROM (
 SELECT
  `v` AS `v`,
  ABS(v) AS `absv`
 FROM (
  SELECT
   `v`
  FROM
   `d`
 ) tsql_50685977084084802819_0000000000
) tsql_50685977084084802819_0000000001

Notice the above translations did not add identifier quotes to our use of “v” in “ABS(v)”. This is because the SQL expression is not parsed in R. If we want to identify terms as variables we can wrap them with as.name() or quote() to get the quoting (and other variable oriented features). The extra SELECT step to pull data from the inner table is used by rquery for important column narrowing steps, and can actually improve query performance.

op_tree <- d %.>%
  sql_node(., "absv" := list(list("ABS(", quote(v), ")"))) %.>%
  sql_node(., "delta" := list(list(quote(absv),"-", quote(v))))
cat(to_sql(op_tree, db))
SELECT
 `absv` AS `absv`,
 `absv` - `v` AS `delta`,
 `v` AS `v`
FROM (
 SELECT
  `v` AS `v`,
  ABS( `v` ) AS `absv`
 FROM (
  SELECT
   `v`
  FROM
   `d`
 ) tsql_58457302067231811449_0000000000
) tsql_58457302067231811449_0000000001

The list(list()) notation is how we say in R that we have a single element list (i.e. one expression) that is built up as a list of terms. The marking notation is cumbersome, but is not needed when we move on to relation nodes, which are parsed in R and can spot identifiers without additional help.

op_tree itself is a an object with its own presentation format:

cat(format(op_tree))
## mk_td("d", c(
##   "v")) %.>%
##  sql_node(.,
##           absv %:=% ABS( v ),
##              *=TRUE) %.>%
##  sql_node(.,
##           delta %:=% absv - v,
##              *=TRUE)

The op_tree supplies an number of important summaries about the proposed query:

column_names(op_tree)
## [1] "absv"  "delta" "v"
tables_used(op_tree)
## [1] "d"
columns_used(op_tree)
## $d
## [1] "v"

Composing nodes

We can add nodes to an op_tree to build larger operator trees (or pipelines).

op_tree2 <- op_tree %.>%
  sql_node(., "prod" := "absv * delta")

cat(format(op_tree2))
## mk_td("d", c(
##   "v")) %.>%
##  sql_node(.,
##           absv %:=% ABS( v ),
##              *=TRUE) %.>%
##  sql_node(.,
##           delta %:=% absv - v,
##              *=TRUE) %.>%
##  sql_node(.,
##           prod %:=% absv * delta,
##              *=TRUE)

However one does not have to use the string notation, wrapr supplies the helper functions qe() (quote expression), qae() (quote assignment expressions), and qc() (quoting concatenate).

op_tree3 <- op_tree %.>%
  sql_node(., qae(prod = absv * delta))

cat(format(op_tree3))
## mk_td("d", c(
##   "v")) %.>%
##  sql_node(.,
##           absv %:=% ABS( v ),
##              *=TRUE) %.>%
##  sql_node(.,
##           delta %:=% absv - v,
##              *=TRUE) %.>%
##  sql_node(.,
##           prod %:=% absv * delta,
##              *=TRUE)

And, the op_tree record keeping can be used to catch potential errors early in pipeline construction. For example if we try to refer to a non-existent variable when adding an operator we get an thrown exception (note: a sql_node() being added must have its variables marked as above for pre-checking to occur, relational nodes will get this checking automatically).

op_tree4 <- op_tree %.>%
  sql_node(., "z" := list(list("1 + ", quote(z))))
## Error in sql_node.relop(., `:=`("z", list(list("1 + ", quote(z))))): rquery::sql_node.relop undefined columns: z

However, early error checking is not currently available with the qae() notation, which parts of the expression are values (versus operators or function names) is not marked in the input.

op_tree4 <- op_tree %.>%
  sql_node(., qae(z = 1 + z))

`rquery

A non-trivial example

We can express non-trivial operations in sql_node()s. For example we can build a node the calculates for each row how many columns contain NA/NULL as is demonstrated here.

# load up example data
d2 <- rq_copy_to(
  db, 'd2',
  data.frame(v1 = c(1, 2, NA, 3),
             v2 = c(NA, "b", NA, "c"),
             v3 = c(NA, NA, 7, 8),
             stringsAsFactors = FALSE))

# look at table
execute(db, d2)
##   v1   v2 v3
## 1  1 <NA> NA
## 2  2    b NA
## 3 NA <NA>  7
## 4  3    c  8
# get list of columns
vars <- column_names(d2)
print(vars)
## [1] "v1" "v2" "v3"
# build a NA/NULLs per-row counting expression.
# names are "quoted" by wrapping them with as.name().
# constants can be quoted by an additional list wrapping.
expr <- lapply(vars,
               function(vi) {
                 list("+ (CASE WHEN (",
                      as.name(vi),
                      "IS NULL ) THEN 1.0 ELSE 0.0 END)")
               })
expr <- unlist(expr, recursive = FALSE)
expr <- c(list(0.0), expr)
cat(paste(unlist(expr), collapse = " "))
## 0 + (CASE WHEN ( v1 IS NULL ) THEN 1.0 ELSE 0.0 END) + (CASE WHEN ( v2 IS NULL ) THEN 1.0 ELSE 0.0 END) + (CASE WHEN ( v3 IS NULL ) THEN 1.0 ELSE 0.0 END)
# instantiate the operator node
op_tree_count_null <- d2 %.>%
  sql_node(., "num_missing" := list(expr))
cat(format(op_tree_count_null))
## mk_td("d2", c(
##   "v1",
##   "v2",
##   "v3")) %.>%
##  sql_node(.,
##           num_missing %:=% 0 + (CASE WHEN ( v1 IS NULL ) THEN 1.0 ELSE 0.0 END) + (CASE WHEN ( v2 IS NULL ) THEN 1.0 ELSE 0.0 END) + (CASE WHEN ( v3 IS NULL ) THEN 1.0 ELSE 0.0 END),
##              *=TRUE)
# examine produced SQL
sql <- to_sql(op_tree_count_null, db)
cat(sql)
## SELECT
##  `v3` AS `v3`,
##  0 + (CASE WHEN ( `v1` IS NULL ) THEN 1.0 ELSE 0.0 END) + (CASE WHEN ( `v2` IS NULL ) THEN 1.0 ELSE 0.0 END) + (CASE WHEN ( `v3` IS NULL ) THEN 1.0 ELSE 0.0 END) AS `num_missing`,
##  `v1` AS `v1`,
##  `v2` AS `v2`
## FROM (
##  SELECT
##   `v1`,
##   `v2`,
##   `v3`
##  FROM
##   `d2`
## ) tsql_16202085721465195382_0000000000
# execute
execute(db, op_tree_count_null)
##   v3 num_missing v1   v2
## 1 NA           2  1 <NA>
## 2 NA           1  2    b
## 3  7           2 NA <NA>
## 4  8           0  3    c

And, as this is an important capability, this exact functionality is wrapped in count_null_cols().

# whole process wrapped in convenience node
d2 %.>%
  count_null_cols(., vars, "nnull") %.>%
  execute(db, .)
##   v3 nnull v1   v2
## 1 NA     2  1 <NA>
## 2 NA     1  2    b
## 3  7     2 NA <NA>
## 4  8     0  3    c

Working with sets of columns

There are some helper methods to apply a parameterized SQL expression to a set of columns.

# vector of columns we want to work on
colset <- qc(v1, v2, v3)
# build new names we want as results
colterms <- paste0(colset, "_isNA") := colset
map_to_char(colterms)
## [1] "c('v1_isNA' = 'v1', 'v2_isNA' = 'v2', 'v3_isNA' = 'v3')"
# build an apply expression to set of columns query 
s_tree <- d2 %.>%
  sql_expr_set(., colterms, 
               "CASE WHEN . IS NULL THEN 1 ELSE 0 END")
cat(to_sql(s_tree, db))
## SELECT
##  v1 AS `v1`,
##  v2 AS `v2`,
##  v3 AS `v3`,
##  CASE WHEN `v1` IS NULL THEN 1 ELSE 0 END AS `v1_isNA`,
##  CASE WHEN `v2` IS NULL THEN 1 ELSE 0 END AS `v2_isNA`,
##  CASE WHEN `v3` IS NULL THEN 1 ELSE 0 END AS `v3_isNA`
## FROM (
##  SELECT
##   `v1`,
##   `v2`,
##   `v3`
##  FROM
##   `d2`
## ) tsql_17260264765870997694_0000000000
execute(db, s_tree)
##   v1   v2 v3 v1_isNA v2_isNA v3_isNA
## 1  1 <NA> NA       0       1       1
## 2  2    b NA       0       0       1
## 3 NA <NA>  7       1       1       0
## 4  3    c  8       0       0       0

SQL first

rquery is a “SQL first” system. It is designed to create SQL queries and dispatch them to remote systems (SQLite, Spark, PostgreSQL, Redshift, and other databases) for execution. The execute() method can be used with big data by adding a table_name argument (or also by using the materialize() method) to land results in a remote table instead of pulling them back to R.

The mantra of SQL-first is data starts in the database, and stays in the database (i.e., it is too large to depend on round-tripping through R). Another important SQL-first package is cdata which provides pure SQL based implementations of operators that generalize pivot/un-pivot, cast/melt, or spread/gather.

The better the database implementation the better rquery will be, both in terms of performance and in terms of function (such as the availability of SQL window functions).

Ad-hoc mode

As a convenience rquery can work with in-memory data.frames by sending them to the SQL service provider. This provider defaults to RSQlite or can be set by setting the global option rquery.rquery_db_executor. We demonstrate this below.

old_o <- options(list("rquery.rquery_db_executor" = list(db = db)))

data.frame(v = -2:2) %.>%
  execute(., op_tree)
##   absv delta  v
## 1    2     4 -2
## 2    1     2 -1
## 3    0     0  0
## 4    1     0  1
## 5    2     0  2

When using the wrapr dot pipe the above can be abbreviated as:

data.frame(v = -2:2) %.>% op_tree
##   absv delta  v
## 1    2     4 -2
## 2    1     2 -1
## 3    0     0  0
## 4    1     0  1
## 5    2     0  2

The above calculation is managed by wrapr dot pipe S3 wrapr_function extensions.

rquery operators can be used directly (without any table description nodes) when working with in-memory data.frames.

data.frame(x = 5) %.>% sql_node(., "z" := "sqrt(x)")
##   x        z
## 1 5 2.236068

The above calculation is triggered by S3 override of any of print(), as.data.frame() and head(). Remote tables need an execute() or materialize() step to specify the database connection.

Table descriptions

rquery table descriptions are simple objects that store only the name of a table and expected columns. Any local data or database table that has at least the set of columns named in the table description can be used in a given rquery pipeline.

The table description “d” we have been using in examples was produced as a result of moving data to a database by rq_copy_to(). However we can also create a description of an existing database table with db_td() or even build a description by hand with mk_td(). Also one can build descriptions of local or in-memory data.frames with local_td().

Helper functions and notation

Using wrapr::qae(), wrapr::qe(), the bquote()-.() notation, and a new rquery-.[] notation can make working with [sql_node()](https://winvector.github.io/rquery/reference/sql_node.html)s much easier (though for many applications I prefer to work with the relational nodes such as extend(), project(), and so on).

The ideas include:

This allows for the following.

library("rquery")

date_cutoff <- '2017-04-02'

td <- mk_td("df", 
            c("cust",
              "trans_date",
              "sales"))

# misspelling not caught (argh!)
tryCatch({
  ops <- td %.>%
  select_rows_se(
    ., 
    qe(trans_date <=  str_to_date(.(date_cutoff), '%Y-%m-%d'))) %.>%
  sql_node(
    .,
    qae(max_date = max(trans_datez)),  # trans_date misspelled
    mods = "GROUP BY cust",
    orig_columns = F)
  },
  error = function(e) { print(e) })


# misspelling caught
tryCatch({
  ops <- td %.>%
  select_rows_se(
    ., 
    qe(trans_date <=  str_to_date(.(date_cutoff), '%Y-%m-%d'))) %.>%
  sql_node(
    .,
    qae(max_date = max(.[trans_datez])),  # trans_date misspelled
    mods = "GROUP BY cust",
    orig_columns = F)
  },
  error = function(e) { print(e) })
## <simpleError in sql_node.relop(., qae(max_date = max(.[trans_datez])), mods = "GROUP BY cust",     orig_columns = F): rquery::sql_node.relop undefined columns: trans_datez>
ops <- td %.>%
  select_rows_se(
    ., 
    qe(trans_date <=  str_to_date(.(date_cutoff), '%Y-%m-%d'))) %.>%
  sql_node(
    .,
    qae(max_date = max(.[trans_date])),
    mods = "GROUP BY cust",
    orig_columns = F)

cat(to_sql(ops, rquery::rquery_default_db_info()))
## SELECT
##  max( "trans_date" ) AS "max_date"
## FROM (
##  SELECT * FROM (
##   SELECT
##    "cust",
##    "trans_date",
##    "sales"
##   FROM
##    "df"
##  ) tsql_15576039787127429537_0000000000
##  WHERE "trans_date" <= str_to_date ( '2017-04-02' , '%Y-%m-%d' )
## ) tsql_15576039787127429537_0000000001 GROUP BY cust

The .[]-notation is just signalling to rquery which symbols are column names (without requiring rquery to fully parse the SQL fragments). The rquery relational nodes get this sort of checking without any additional notation as they do fully parse the R expressions prior to any SQL translation.

We can combine .() and .[] for even more powerful expressions such as the following.

library("rquery")

date_cutoff <- '2017-04-02'

td <- mk_td("df", 
            c("cust",
              "trans_date",
              "sales"))

COL_TO_MAX = as.name("trans_date")
NEW_COL = paste0("max_", COL_TO_MAX)
GROUP_COL = "cust"

ops <- td %.>%
  select_rows_se(
    ., 
    qe(trans_date <=  str_to_date(.(date_cutoff), '%Y-%m-%d'))) %.>%
  sql_node(
    .,
    qae(.(NEW_COL) := max(.[.(COL_TO_MAX)])),
    mods = paste("GROUP BY", GROUP_COL),
    orig_columns = F)

cat(to_sql(ops, rquery::rquery_default_db_info()))
## SELECT
##  max( "trans_date" ) AS "max_trans_date"
## FROM (
##  SELECT * FROM (
##   SELECT
##    "cust",
##    "trans_date",
##    "sales"
##   FROM
##    "df"
##  ) tsql_08309843820851199015_0000000000
##  WHERE "trans_date" <= str_to_date ( '2017-04-02' , '%Y-%m-%d' )
## ) tsql_08309843820851199015_0000000001 GROUP BY cust

Conclusion

rquery is new package, but it is already proving to be correct (avoiding known data processing issues) and performant. For working with R at a big data scale (say using PostgreSQL or Spark) rquery is the right specialized tool for specifying data manipulation.

See also

For deeper dives into specific topics, please see also:


Appendix: Always clean up on the way out

options(old_o)
DBI::dbDisconnect(db)