The vignette aims to show examples of using SparkR as an interface to run streaming Spark jobs through R - using the analysisPipelines package. The major use case is that of implementing a pipeline using SparkR dataframes for streaming data.
Using Spark as an engine requires the SparkR package to be installed. SparkR is distributed natively with Apache Spark and is not distributed on CRAN. The SparkR version needs to directly map to the Spark version (hence the native distribution), and care needs to be taken to ensure that this is configured properly.
To install from Github, run the following command, if you know the Spark version:
The other option is to install SparkR by running the following terminal commands if Spark has already been installed.
sys.setenv()
function.sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")
sparkRSessionCreateIfNotPresent
functionThis example illustrates usage of pipelines for a streaming application. In this use case streaming data is read from Kafka, aggregations are performed and the output is written to the console.
Read streaming data from Kafka.
## Define these variables as per the configuration of your machine. The below example is just illustrative.
kafkaBootstrapServers <- "192.168.0.256:9092,192.168.0.257:9092,192.168.0.258:9092"
consumerTopic <- "topic1"
streamObj <- read.stream(source = "kafka", kafka.bootstrap.servers = kafkaBootstrapServers, subscribe = consumerTopic, startingOffsets="earliest")
printSchema(streamObj)
Users can define their own functions and use it as a part of the pipeline. These functions range from data prep, aggregations, casting data to suitable write stream format, etc.
# Function to convert datatype json struct to columns
convertStructToDf <- function(streamObj) {
streamObj <- SparkR::select(streamObj,list(getField(streamObj$`jsontostructs(value)`,"bannerId"),
getField(streamObj$`jsontostructs(value)`,"mobile"),
getField(streamObj$`jsontostructs(value)`,"homeAppliance"),
getField(streamObj$`jsontostructs(value)`,"gamingConsole"),
getField(streamObj$`jsontostructs(value)`,"accessories"),
getField(streamObj$`jsontostructs(value)`,"brand"),
getField(streamObj$`jsontostructs(value)`,"previousPrice"),
getField(streamObj$`jsontostructs(value)`,"currentPrice"),
getField(streamObj$`jsontostructs(value)`,"discount"),
getField(streamObj$`jsontostructs(value)`,"emi"),
getField(streamObj$`jsontostructs(value)`,"crossSale"),
getField(streamObj$`jsontostructs(value)`,"customerId"),
getField(streamObj$`jsontostructs(value)`,"ts"),
getField(streamObj$`jsontostructs(value)`,"click"),
getField(streamObj$`jsontostructs(value)`,"conversion"),
getField(streamObj$`jsontostructs(value)`,"age"),
getField(streamObj$`jsontostructs(value)`,"income"),
getField(streamObj$`jsontostructs(value)`,"maritalStatus"),
getField(streamObj$`jsontostructs(value)`,"segment")))
colnames(streamObj) <- c("bannerId","mobile","homeAppliance","gamingConsole","accessories","brand","previousPrice","currentPrice",
"discount","emi","crossSale","customerId","ts","click","conversion","age","income","maritalStatus","segment")
return(streamObj)
}
# Function to cast columns as string, integer, etc
castDfColumns <- function(streamObj) {
streamObj <- SparkR::selectExpr(streamObj, "bannerId","mobile","homeAppliance","gamingConsole","accessories","brand",
"CAST(previousPrice as INTEGER)","CAST(currentPrice as INTEGER)","CAST(discount as INTEGER)","emi",
"crossSale","customerId","ts","CAST(click as INTEGER)","CAST(conversion as INTEGER)",
"CAST(age as INTEGER)","CAST(income as INTEGER)","maritalStatus","segment")
streamObj$ts <- SparkR::to_timestamp(streamObj$ts,"yyyy-MM-dd HH:mm:ss")
return (streamObj)
}
# Function to convert datatype json struct to columns
convertDfToKafkaKeyValuePairs <- function (streamObj, kafkaKey) {
streamObj <- SparkR::toJSON(streamObj)
streamObj$key <- kafkaKey
return(streamObj)
}
# Function to summarize click stream data
globalUiMetrics <- function (streamObj) {
## Aggregation query
streamObj <- SparkR::summarize(SparkR::groupBy(streamObj,streamObj$bannerId),
impressions=count(streamObj$customerId),
clicks=sum(streamObj$click),
conversions=sum(streamObj$conversion))
SparkR::colnames(streamObj) <- c("banner_id","impressions","clicks","conversions")
return (streamObj)
}
In order to use pipelines, a pipeline object needs to be defined. Notice the Spark pipelines are defined using the readInputSpark
function.
Each user-defined function needs to be registered to the pipeline object. Post registration, the function can be used to construct a pipeline. A pipeline can be a pipeline of multiple functions called in a particular sequence.
# Define pipeline object
pipelineObj <- analysisPipelines::StreamingAnalysisPipeline(input = streamObj)
consumerDataSchema <- structType(structField("bannerId", "string"),
structField("mobile", "string"),
structField("homeAppliance", "string"),
structField("gamingConsole", "string"),
structField("accessories", "string"),
structField("brand", "string"),
structField("previousPrice", "string"),
structField("currentPrice", "string"),
structField("discount", "string"),
structField("emi", "string"),
structField("crossSale", "string"),
structField("customerId", "string"),
structField("ts", "string"),
structField("click", "string"),
structField("conversion", "string"),
structField("age", "string"),
structField("income", "string"),
structField("maritalStatus", "string"),
structField("segment", "string"))
# Register user-defined functions
registerFunction("convertStructToDf", "", functionType = "streaming", engine = "spark-structured-streaming")
registerFunction("castDfColumns", "", functionType = "streaming", engine = "spark-structured-streaming")
registerFunction("convertDfToKafkaKeyValuePairs", "", functionType = "streaming", engine = "spark-structured-streaming")
getRegistry()
# Define pipeline
# Do data prep
pipelineObj %>% castKafkaStreamAsString_sparkSS() %>%
convertKafkaValueFromJson_sparkSS(schema = consumerDataSchema, outAsIn = T) %>% convertStructToDf_sparkSS(outAsIn = T) %>% castDfColumns_sparkSS(outAsIn = T, storeOutput = T) -> pipelineObj
pipelineObj %>>% getPipeline
pipelineObj %>>% visualizePipeline
The pipeline is run by calling the generateOutput()
function. The output
attribute of the pipeline object contains the resultant Spark dataframe(s).
In this example the Spark DataFrames are converted to R dataframes to help understand the result.
Currently, streaming pipelines have the limitation that they are able to execute only linear flows as this constrained by Apache Spark Structured Streaming. Non-linear flows can be defined but might throw execution errors in runtime. Also, streaming pipelines can be implemented using only 1 engine i.e. Apache Spark Structured Streaming.