Objectivity/DB Spark Adapter : Spark Adapter Tutorial : Writing a Data Frame
1 
Writing a Data Frame
In this topic you will store application data in the federated database using Spark SQL. You will create a set of 1000 fictitious customer instances in an RDD, convert the RDD to a data frame, then save the data to the federated database.
How a Data Frame Writes to a Federated Database
Before executing the program that creates the customer data and stores it in the federated database, let’s examine the code to understand what it is doing. The application is written in Scala, which is a common language for interacting with Spark.
This section will cover some highlights as a refresher, but it is assumed that you have some experience working with Spark SQL. If this is not the case, refer to the Spark documentation and the quick start guide on the official Spark site (spark.apache.org) before continuing.
Defining Elements
Open src\main\scala\com\thingspan\spark\demo\CreateCustomers.scala in your preferred editor.
After the required Spark imports, you see that the class definition for CreateCustomers provides several global data members that will be used for creating fictitious Customer elements. Note that the schema for Customers is provided.
object CreateCustomers {
  val maleNames = ...
  ...
  case class Customer(customerId: Int,
    firstName: String,
    lastName: String,
    age: Short,
    sex: String,
    zipCode: Int,
    points: Long)
Recall that a main difference between an RDD and a data frame is the support for schema.
Initializing Spark
Now let’s look at the code that initializes Spark.
In the main program, Spark SQL APIs create a Spark configuration for a job named CreateCustomers:
def main(args: Array[String]) {
  ..
  val sparkConf = new SparkConf()
    .setAppName("CreateCustomers")
    ...
Note that main can accept any number of String arguments—this is so the boot file for the federated database can be passed in at runtime. In later sections, Spark SQL queries will be passed in using this approach.
The spark configuration is used to construct a Spark context, which is the main entry point for Spark functionality.
val sparkContext = new SparkContext(sparkConf)
As a refresher, a Spark context represents a connection to a Spark cluster, and it is used to create resilient distributed datasets (RDDs) on that cluster. An RDD is the basic abstraction in Spark and is an immutable collection of elements that can be operated on in parallel. An RDD can be converted to a data frame.
The SparkContext is then used to create an SQLContext, which allows you to create data frames and execute SQL queries.
...
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
import sqlContext.implicits._
Creating the Customer Data Frame
Let’s look at the code that creates the customer data and writes it to the Objectivity/DB federated database. The code creates a parallelized collection of 1000 customers and splits the dataset into 5 partitions. Recall that Spark will run one task for each partition.
val numCustomers = 1000
val customerIdRDD = sparkContext.parallelize(1 to numCustomers, 5)
The numbers from 1 to 1000 are used as the customer IDs. With the help of a random number generator, the fictitious customer instances are created in an RDD. At each partition, the customer IDs are mapped to full customer instances with names, ages, zip codes, and so forth.
val customerRDD = customerIdRDD.mapPartitions({ x =>
  val r = scala.util.Random
  x.map { id =>
      Customer(
        id,
        maleNames(r.nextInt(maleNames.length)),
        surnames(r.nextInt(surnames.length)),
        (15 + r.nextInt(65)).toShort,
        "M",
        (94000 + r.nextInt(2000)),
        (5000 * r.nextInt(200)).toLong)
    }
 }, true)
Writing the Data to the Objectivity/DB Federated Database
In order to write the customer data to the federated database, the RDD must first be converted to a data frame. That data frame is then written in Objectivity format to the federated database with the given boot file.
val customerDF = customerRDD.toDF()
...
customerDF.write.
  mode(SaveMode.Overwrite).
  format("com.objy.spark.sql").
  option("objy.bootFilePath", bootFile).
  option("objy.dataClassName", "com.thingspan.spark.demo.Customer").
  ...
On completion, the Spark context is stopped.
sparkContext.stop
For more information about options for the Objectivity/DB Spark Adapter (for the data frame writer), see Creating and Updating Objects and Relationships.
 
Creating Customers
In this section, you will use a Gradle task to create a number of Customer objects.
1. Look at the createCustomers task in the build.gradle file in the ObjySparkTutorial directory to understand what it’s doing.
task createCustomers(type: Exec) {
  description 'Creates 1000 Customer objects in the federated database.'
  group = tutorialGroup
    executable = ... 
    args "-c", "${SPARK_HOME}/bin/spark-submit --master local \
      --conf spark.executor.extraLibraryPath=${THINGSPAN_HOME}/lib \
      --conf spark.driver.extraLibraryPath=${THINGSPAN_HOME}/lib \
      --class com.thingspan.spark.demo.CreateCustomers ${ROOT_DIR}/build/libs/ObjySparkTutorial.jar \
      ${ROOT_DIR}/data/customers.boot"
}
The two lines after the description and group are standard Gradle syntax for executing a command-line process. The work of the task centers on the spark-submit command, which launches the application (ObjySparkTutorial.jar) on a local cluster, running the CreateCustomers class. Two configuration options point to the lib directory of the ThingSpan installation, which contains the Objectivity/DB Spark Adapter dependency JAR and other libraries. The name of the boot file is passed as a runtime argument to provide access to the federated database.
2. Run the task as follows:
gradlew createCustomers
The federated database is populated with the customer objects. The program output includes a printout of the schema for the Customer objects that were added:
|-- customerId: integer (nullable = false)
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- age: short (nullable = false)
|-- sex: string (nullable = true)
|-- zipCode: integer (nullable = false)
|-- points: long (nullable = false)
3. Look inside the data directory and note that additional database files (.DBs) have been created.
   
Verifying Results
Let’s confirm that our customer objects are stored in the federated database. Included with the tutorial application is a class called RunQuery for querying the federated database using Spark.
RunQuery accepts a query string and sends it to the driver. The driver opens an Objectivity data frame for read, executes the provided query, and prints the results.
1. Open src\main\scala\com\thingspan\spark\demo\RunQuery.scala and notice that this class creates a data frame in Objectivity format that reads data from a federated database.
2. Look at the runQuery task in the build.gradle file and examine the SQL query string that will be passed to the application:
\"SELECT customers.firstName, customers.lastName FROM customers WHERE customers.firstName = 'Daz'\" \
You can see that the query searches for customers whose first name is Daz.
3. Run the runQuery task as follows:
gradlew runQuery
Observe the output to see the customers with the first name Daz.
4. Use an administrative tool to send a declarative query (DO) statement that performs a query to return all attributes for customers whose first name is Daz:
objy DO -outputFormat table -boot data\customers.boot -statement "From com.thingspan.spark.demo.Customers WHERE firstName == 'Daz' RETURN *";
Examine the results and note that the fields do not include a city. This will be of interest later in the tutorial.
For more information about the DO language syntax, see Declarative Objectivity (DO) Language. For help with the DO runner, issue the following command:
objy DO -help