fancy_numbers_example(Scala)

A Fancy (Numbers) Example

This notebook demonstrates the use of Spark Records in a scenario where we face the typical problems of big data in the real world: invalid inputs, complex business rules and code quality issues.

The material builds upon Spark Records by example and dives deeper into the capabilities of the Spark Records framework:

  • Ignoring some input data based on business rules

  • Custom metric collection and data quality checks

  • Capturing additional information about the data in the records' row-level logs

  • Known error identification

  • Root cause analysis in the presence of both known and unknown errors

  • Error mitigation strategies

Running this notebook

If you want to run this notebook you have to set up your environment. Skip this section if you prefer to treat the notebook as documentation.

  1. Get a Databricks account. If you are not a Databricks customer, you can create a free Databricks Community Edition account.

  2. Create a new cluster. (Clusters / Create Cluster) Note that Spark Records requires Spark version 2.x or higher and Scala 2.11.
    Tip: We strongly recommend NOT running Spark on Scala 2.10 because the reflection API, which Spark extensively uses, is not thread-safe. Occasionally, this leads to very strange behavior.

  3. Get a copy of the Spark Records JAR. You can build it yourself or grab the latest released version from https://bintray.com/swoop-inc/maven/spark-records.

  4. Import the Spark Records library. (Workspace / Create / Library) Add the Spark Records JAR and attach it to your cluster (or choose to have it automatically attached to all clusters).

  5. Import this notebook by using Import Notebook from this page or Workspace / Import and then providing the notebook's URL (https://swoop-inc.github.io/spark-records/fancy_numbers_example.html).

The fancy_numbers example

The test suite for Spark Records includes an example in the examples.fancy_numbers package whose goal is to categorize numbers. Because the test classes are not included in the Spark Records distribution, the following cell contains the entire code of Example.scala.

Note that we are using a package cell to make sure the classes are defined in the examples.fancy_numbers package, exactly as they would be in the Spark Records test suite.

Follow the comments in the code.

package examples.fancy_numbers

import com.swoop.spark.records._

/** Let's create a simple but not simplistic example of data processing with Spark.
  *
  * We have to categorize integers in the range [0, 100] in the following way:
  *
  * - Numbers outside of [0, 100] are errors.
  *
  * - 0s should be ignored.
  *
  * - Prime numbers are categorized as `prime`.
  *
  * - Perfect numbers, those that are equal to the sums of their factors, e.g.,
  * 6 = 3 + 2 + 1, are categorized as `perfect`.
  *
  * - 13 is categorized as `bakers-dozen`
  *
  * - Everything else is categorized as `even` or `odd`.
  *
  * This example is simple because the inputs are integers but it is not simplistic
  * because we have to deal with errors and inputs that should not produce any output.
  * Further, just to make things a little more interesting, we'll introduce a bug in our code.
  *
  * Here is how we'd like to represent our output data.
  *
  * @param n        the input integer
  * @param category the integer's category
  */
case class FancyNumber(n: Int, category: String)


/** Wrapping our data with a record is simple: we extend `Record[OurData, OurInput]` and
  * then provide defaults. Below you see the minimum set of fields you need in a record.
  * You can add more fields if you need more information in the wrapper. Common examples include
  * adding a timestamp, a record ID or schema versioning information.
  *
  * @see [[com.swoop.spark.records.Record]]
  * @see [[com.swoop.spark.records.Issue]]
  * @param features A bit mask that allows records to be easily filtered
  * @param data     The data the record wraps. May be empty in the case of some error records.
  * @param source   The source of the data. This is how we keep track of data provenance.
  *                 A common optimization is to only include the source for error records.
  * @param flight   Several jobs that work together to produce a result share the same flight ID.
  *                 How you choose to organize this is up to you. The ID is often a UUIDv4.
  * @param issues   The [[issues]] field contains the record-level "log file".
  */
case class FancyNumberRecord(
  features: Int,
  data: Option[FancyNumber] = None,
  source: Option[Int] = None,
  flight: Option[String] = None,
  issues: Option[Seq[Issue]] = None
) extends Record[FancyNumber, Int]


/** This is just a holder for our example code */
object Example {

  /** A record builder for a simple use case like the one we are dealing with should
    * extend [[RecordBuilder]].
    *
    * [[JobContext]] provides state management and other services during building.
    * In this example, we'll use it collect some custom statistics.
    *
    * @see [[RecordBuilder]]
    * @see [[JobContext]]
    * @see [[BuildContext]]
    * @param n  the input integer to categorize
    * @param jc the job context
    */
  case class Builder(n: Int, override val jc: JobContext)
    extends RecordBuilder[Int, FancyNumber, FancyNumberRecord, JobContext](n, jc) {

    /** This is the main method we have to implement. Here we build an
      * [[examples.fancy_numbers.FancyNumber]] from the input.
      *
      * @return the record data generated for this input or `None`
      */
    def buildData: Option[FancyNumber] = {
      // Use throwError to throw _identifiable exceptions_, ones you can easily find
      // after the job has finished executing
      if (n < 0 || n > 100) throwError("input outside [0, 100]", n.toString, Err.OUT_OF_RANGE)

      // jc.inc() is how we can collect custom metrics during job execution
      val stat = if (n % 2 == 0) "even" else "odd"
      jc.inc(s"numbers.$stat")

      if (n == 0) None // returning None will skip this input and generate no record
      else Some(FancyNumber(n, category))
    }

    /** Returns a record with valid data.
      * This method is a hook to allow you to create your own custom records.
      * It uses the state that has been accumulated during the building of the record's data,
      * namely, `features` and `issues`.
      *
      * When everything is going well, we save on storage by not saving the source of the data.
      *
      * @param data   the data produced in [[buildData]]
      * @param issues the issues collected during the building process
      * @return A valid record with data
      */
    def dataRecord(data: FancyNumber, issues: Seq[Issue]): FancyNumberRecord =
      FancyNumberRecord(features, Some(data), None, jc.flight, issues)

    /** Returns an error record.
      * This method is called if the record will contain an error.
      * 
      * Depending on whether the error occurred during or after [[buildData]], the resulting
      * record may or may not have `data`. In our case `maybeData` will always be `None` 
      * because there is no code path where an error is generated after [[buildData]].
      *
      * @param issues the issues collected during the building process, at least one of
      *               which will be an error
      * @return A valid error record
      */
    def errorRecord(issues: Seq[Issue]): FancyNumberRecord =
      FancyNumberRecord(features, maybeData, Some(n), jc.flight, issues)

    /** Returns the category of the input number. */
    def category: String =
      if (primesTo100(n)) "prime"
      else if (n == 6 || n == 28) {
        // warn(), info() and debug() let you store messages inside the `issues` collection
        // associated with each record. Debug messages are particularly useful during
        // development. Features.QUALITY_CONCERN is one of the predefined record flags.
        warn("rare number", "perfect, really?", Features.QUALITY_CONCERN)
        "perfect"
      }
      else if (n == 13) "bakers-dozen"
      else if (n % 2 == 0) "even"
      else "odd"

  }

  /** One of the key principles of bulletproof big data processing is controlling failure modes
    * and ensuring that errors can be quickly separated in appropriate buckets, e.g., known vs.
    * unknown, safely ignorable vs. something one has to act on, etc.
    *
    * In Spark records, the first step in doing this involves associating IDs with known/expected
    * error types.
    */
  object Err {
    /** The ID of the error raised when we get an out-of-range number. */
    val OUT_OF_RANGE = 1001
  }

  /** Integers IDs are great for big data processing but not great for comprehension.
    * Fear not: you can associated descriptive messages with error (and warning, info, debug)
    * IDs in the following manner. The Spark Records root cause analysis framework will
    * automatically make these available without adding them to the data.
    */
  Issue.addMessagesFor(Issue.Error, Map(
    Err.OUT_OF_RANGE -> "number out of range"
  ))

  /** This little bit of API sugar makes it easier to plug a builder into Spark's RDD and
    * Dataset APIs. It is not required but makes code simpler & cleaner.
    */
  def buildRecords[Ctx <: JobContext](inputs: TraversableOnce[Int], jc: Ctx)
  : Iterator[FancyNumberRecord] =
    inputs.flatMap(Builder(_, jc).build).toIterator

  /** A simple prime number algorithm that builds a boolean array telling us whether a
    * number is prime. We've introduced an off-by-one error in the implementation on
    * purpose to show how Spark Records deals with unexpected failures (as opposed to
    * out of range input, which is an expected failure).
    *
    * @see http://alvinalexander.com/text/prime-number-algorithm-scala-scala-stream-class
    */
  private lazy val primesTo100 = {
    def primeStream(s: Stream[Int]): Stream[Int] = Stream.cons(s.head, primeStream(s.tail filter {
      _ % s.head != 0
    }))

    val arr = Array.fill(100)(false) // bug: should be 101 because of 0-based indexing
    primeStream(Stream.from(2)).takeWhile(_ <= 100).foreach(arr(_) = true)
    arr
  }

}
Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.

Defining the Spark job

We'll represent our job as a simple function that, given some numbers and a table name, builds a dataset of FancyNumberRecords and saves is into the table. Immediately following this, we perform a data quality check requiring that Spark Records saw the number of inputs we provided and that the error rate is less than 1%.

Note that we are passing in the driver context. This way, if there was an exception during job execution, we could query the driver context for accumulated statistics, etc.

import com.swoop.spark.records._
import examples.fancy_numbers._ 

def saveRecords(dc: SimpleDriverContext, numbers: Seq[Int], tableName: String) = {
  val jc = dc.jobContext(SimpleJobContext)

  spark.createDataset(numbers)
    .mapPartitions(inputs => Example.buildRecords(inputs, jc))
    .write.mode(SaveMode.Overwrite).saveAsTable(tableName)
  
  jc.checkDataQuality(minInputs=numbers.length, maxErrorRate=0.01, maxSkippedRate=0.001)
}
import com.swoop.spark.records._ import examples.fancy_numbers._ saveRecords: (dc: com.swoop.spark.records.SimpleDriverContext, numbers: Seq[Int], tableName: String)Unit

Running the job

We create the driver context in a separate cell for the same reason we pass it into saveRecords: in order to have it accessible in the case of an exception during cell execution.

val tableName = "fancy_numbers_neg5to100"
val dc = SimpleDriverContext(sc)
tableName: String = fancy_numbers_neg5to100 dc: com.swoop.spark.records.SimpleDriverContext = com.swoop.spark.records.SimpleDriverContext@5b2802cb

It's time to run the job.

saveRecords(dc, -5 to 100, tableName)
E00004: Too many errors. (inputs: 106, skipped: 1, errors: 6, withData: 99)

Our data quality check failed: there were too many errors, far above the maxErrorRate of 1% we specified. (The error rate is #errors/#withData, which is roughly 6%).

Root cause analysis

A good first step in root cause analysis is looking at the metrics collected during job execution.

dc.printStats
input.count: 106 issue.category.1: 6 issue.category.2: 2 issue.count: 8 issue.id.1-1001: 5 issue.id.2-1024: 2 numbers.even: 51 numbers.odd: 50 record.count: 105 record.data.count: 99 record.empty.count: 1 record.error.count: 6 record.features.0: 97 record.features.1: 6 record.features.2: 2 record.skipped.count: 1

Apart from numbers.even and numbers.odd, all other metrics were automatically collected by Spark Records.

To an experienced user of Spark Records, a few things will immediately jump out in the output. There are 6 error issues (issue.category.1: 6 and Issue.Error.categoryId == 1) but only 5 of the errors are ones we've previously encountered (issue.id.1-1001: 5 or 5 issues were errors with issue ID 1001, which is Example.Err.OUT_OF_RANGE).

Luckily, we don't have to remember category and issue IDs to figure this out. Spark Records provides many tools that make root cause analysis very fast.

Let's start by loading the job output and caching it to speed up queries.

Tip: If you are dealing with a lot of data, caching the entire dataset may not be feasible. However, in most cases, caching records.errorRecords would work equally well when you are drilling into unknown errors.

val records = spark.table("fancy_numbers_neg5to100").as[FancyNumberRecord].cache
records: org.apache.spark.sql.Dataset[examples.fancy_numbers.FancyNumberRecord] = [features: int, data: struct<n: int, category: string> ... 3 more fields]