advanced_root_cause_analysis(Scala)

Advanced Root Cause Analysis

By following the patterns of Spark Records you gain the benefits lightning-fast root cause analysis even if your records were built using tooling different from the one provided by the framework.

This notebook demonstrates how this can be done using Python and SparkSQL. We will analyze the output of the FancyNumbers job from this notebook in three steps:

  1. Prepare job output for analysis.

  2. Generate issue summaries.

  3. Drill down to find the exact location of unknown errors.

Beyond showing how to do root cause analysis directly against records, this notebook also demonstrates how to customize the analysis to your particular job & input/output data.

Tip: At Swoop we reuse common root cause analysis operations by putting them in a shared notebook that we simply %include into the notebook we are currently working in.

Prepare job output for analysis

We grab the data we need + map it to a standard temporary view name: records_to_investigate so that we don't have to change our SparkSQL every time we want to look at a new dataset.

We also print the schema of the records because it helps us write queries faster. Note that every record may have 0 or more issues, each of which can have 0 or more causes, each of which has a stack of many source locations. This is how Spark Records captures extremely detailed exception information with nested causes and full stack traces.

%python 
spark.table("fancy_numbers_neg5to100").createOrReplaceTempView("records_to_investigate")
spark.table("records_to_investigate").printSchema()
root |-- features: integer (nullable = true) |-- data: struct (nullable = true) | |-- n: integer (nullable = true) | |-- category: string (nullable = true) |-- source: integer (nullable = true) |-- flight: string (nullable = true) |-- issues: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- category: integer (nullable = true) | | |-- message: string (nullable = true) | | |-- causes: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- message: string (nullable = true) | | | | |-- stack: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- className: string (nullable = true) | | | | | | |-- methodName: string (nullable = true) | | | | | | |-- fileName: string (nullable = true) | | | | | | |-- lineNumber: integer (nullable = true) | | |-- id: integer (nullable = true) | | |-- details: string (nullable = true)

Generate an issue summary

A good first step is to get an overview summary of all the issues in the saved data. We want to see:

  • category: the issue category
  • id: the issue ID
  • cnt: a count of how many issues of this type we've seen
  • messages: a summary of all distinct messages associated with this issue ID (there could be more than one in some cases)
  • sources: a summary of all distinct inputs associated with the issue.

Tip: When we write custom root cause analysis code we can really tune it for quick comprehension. Our source summaries depend on the fact that we are dealing with a small input set and that the inputs themselves are small (just integers). In a complex real world example, it's better to just include a sample source (using first() instead of collect_set()).

%sql
select 
  issue.category,
  issue.id,
  count(*)                              as cnt,
  collect_set(issue.message)            as messages,
  collect_set(coalesce(source, data.n)) as sources -- we only save sources for errors but we know the number is also in data.n
from records_to_investigate
lateral view explode(issues) exploded as issue
where features <> 0 -- we filter to records that have any feature flag set
group by issue.category, issue.id
order by issue.category, issue.id
1null1["java.lang.ArrayIndexOutOfBoundsException: 100"][100]
110015["E01001: input outside [0, 100]"][-3,-2,-5,-1,-4]
210242["W01024: rare number"][6,28]

We have the output we expect. It shows two types of errors (5 for out of range numbers -5..-1, one for the index out of bounds exception caused by the off-by-one bug in the code) and two warnings for the perfect numbers.

However, there is something unsatisfying about the output: we have to decode the meaning of category and id from their numeric values, which is not great. With a bit of work we can do much better.

Issue ID descriptions

Because the mapping of issue IDs to descriptions changes infrequently, we can build a table with the information and join it against our root cause analysis tables.

We should not forget to include the default, framework-provided, IDs. We can collect them with one line in Scala or by reading the source code.

import com.swoop.spark.records._

Seq(Issue.Error, Issue.Warning, Issue.Info, Issue.Debug).map(o => (o.featureMask, o.idMessages)).foreach(println)
(1,Map(0 -> unknown, 1 -> internal error, 2 -> unsupported schema, 3 -> missing required argument, 4 -> data quality check failed)) (2,Map(0 -> unknown)) (4,Map(0 -> unknown)) (8,Map(0 -> unknown)) import com.swoop.spark.records._

With a few lines of Python we map this information and our own application specific IDs into a table. Again, we'll create a temp view in order to save on rewriting SQL later.

Tip: This can be done ahead of time or as part of a shared "prologue" for your root cause analysis toolkit.

%python
# Dictionary of issue categories mapping to issue IDs mapping to messages associated with these IDs
id_messages = {
  1 : { # error
    0: "unknown",
    1: "internal error",
    2: "unsupported schema",
    3: "missing required parameter",
    4: "data quality check failed",
    # from our fancy numbers example
    1001: "negative number"
  },
  2 : { # warning
    0: "unknown"
  },
  4 : { # info
    0: "unknown"
  },
  8 : { # debug
    0: "unknown"
  }
}

# Flatten the above into an array of dictionaries
list = []
for category, messages in id_messages.iteritems():
  for issue_id, id_message in messages.iteritems():
    list.append({"category": category, "id": issue_id, "id_message": id_message})

# Save the mapping into a table
# We may get a warning: "inferring schema from dict is deprecated". Yes, but it is rather expedient in this case. :-)
spark.createDataFrame(list).write.mode("overwrite").saveAsTable("fancy_numbers_id_messages")

# Create a temp view with a more standard name so that we can use %sql cells in the notebook
spark.table("fancy_numbers_id_messages").createOrReplaceTempView("records_id_messages")

Now that we've ready, let's get more detail on just the errors.

%sql
with
issues as (
  select 
    issue.category,
    issue.id,
    count(*)                   as cnt,
    collect_set(issue.message) as messages,
    collect_set(source)        as sources
  from records_to_investigate
  lateral view explode(issues) exploded as issue
  where (features & 1) <> 0
  group by issue.category, issue.id
)
select
  lhs.id, id_message, cnt, messages, sources
from issues as lhs
-- Use a left outer join in order not to remove rows without an entry in records_id_messages
left outer join records_id_messages rhs
on lhs.category = rhs.category and lhs.id = rhs.id
order by cnt desc
1001negative number5["E01001: input outside [0, 100]"][-3,-2,-5,-1,-4]
nullnull1["java.lang.ArrayIndexOutOfBoundsException: 100"][100]

We did it. We have the issue descriptions.

Tip: You can use the same pattern to join other metadata about known issues. This is particularly useful when you'd like to filter out expected errors, which you would only want to be concerned about if they exceed a certain threshold. For example, at Swoop we work with many thousands of publishers who sometimes integrate us into their websites and mobile apps incorrectly. The data generated from bad integrations is invalid and generates error records. However, because the problem is typically self-correcting, our data quality checks allow for many such errors before an alert is triggered.

Find the source of unexpected errors

The output above shows us one unexpected error: the index out of bounds exception. We want to find out what's causing it.

Because exceptions are sometimes thrown from deep within framework code, e.g., the dreaded null dereference errors (NullPointerException in Java, None.get in Scala, NoneType errors in Python), it would speed up our root cause analysis if we were to separate the exception origin (where it was raised) from the exception cause in our code. The exception cause in our code is the line that invoked whatever caused the exception. In other words, it is the first element on the stack trace from our codebase. In this particular example, this is any code whose className is in the examples.fancy_numbers package.

Pulling this information out in SparkSQL, without resorting to UDFs is not pretty but is also not very difficult. Keep in mind that, because of the consistency that comes with following the schema of Spark Records, you'd only have to write something of this complexity once and then you can reuse it both across failed runs of the same job and across many other jobs with no tweaks or with just minor tweaks.

Tip: If you are dealing with very big data and you have frequent need to perform this type of root cause analysis, you can meaninfully speed the process up by writing a custom UDF that processes the nested arrays of causes and stack elements directly. Here we wanted to stick to SparkSQL because it can be applied in any programming language and environment.

%sql 
with 
error_records (
  -- Filter records to ones containing error issues only.
  select *,
    -- Assign unique row IDs to the records, since we'll need to partition the data when we look for the first stack trace position in our code.
    -- We wouldn't need this if Spark allowed unsorted windows, e.g., lag(col) over (). 
    -- Normally, unsorted windows don't make sense in distributed systems but they do make sense with rows generated by LATERAL VIEW as there is implicit linear ordering there
    -- Here we use a random UUIDv4 by invoking a the statis randomUUID() method of java.util.UUID
    reflect('java.util.UUID','randomUUID') as row_id
  from records_to_investigate
  where (features & 1) <> 0 -- records marked as having errors only
),
all_locations as (
  -- Explode issues, causes and stack trace elements.
  -- Keep only the first element of a stack track or all the elements from our code (class names from the examples.fancy_numbers package)
  select *,
    -- Find the stack trace position of the previous stack element.
    -- The result will be null for the first stack element and 0 for the first stack element in our code after the first stack element
    lag(stack_pos) over (
      partition by row_id, issue_pos, cause_pos 
      order by stack_pos)                                      as prev_stack_pos,
    -- Find the class name of the previous stack element.
    -- As with prev_stack_pos, the result will be null for the first stack element
    lag(stack.className) over (
      partition by row_id, issue_pos, cause_pos 
      order by stack_pos)                                      as prev_class_name
  from error_records
  lateral view posexplode(issues) exploded_issues              as issue_pos, issue
  lateral view posexplode(issue.causes) exploded_causes        as cause_pos, cause
  lateral view posexplode(cause.stack) exploded_stack_elements as stack_pos, stack
  where issue.category = 1  -- error issues
    and issue.id is null    -- unknown ones
    and (    stack_pos = 0  -- first stack element
          or stack.className regexp 'examples\.fancy_numbers\.') -- or a stack element from our code
),
selected_locations as (
  -- Filter locations to just the exception origin or the first location in our code.
  -- Handle the special case where our code may be first in the stack trace.
  select *,
    if(prev_stack_pos is null, stack, null)                             as exception_origin,
    if(stack.className regexp 'examples\.fancy_numbers\.', stack, null) as our_location
  from all_locations
  where (prev_stack_pos is null or prev_stack_pos = 0)
    and (prev_class_name is null or (not prev_class_name regexp 'examples\.fancy_numbers\.'))
)
-- Summarize the results.
-- We group by the first stack trace location in our code.
-- We summarize the exception origins (there could be more than one if the line in our code did multiple things) 
select 
  count(distinct row_id)        as cnt_records,          -- number of records where the exception is at this location in our code
  first(our_location, true)     as our_location,         -- first stack trace element in our code
  collect_set(issue.message)    as messages,             -- summary of exception messages at this location
  collect_set(exception_origin) as exception_locations,  -- unique exception origins
  collect_set(source)           as sources,              -- summary of unique inputs causing the exception
  first(named_struct(
    'features', features,
    'source', source,
    'flight', flight,
    'data', data,
    'issues', issues
  ))                            as sample_record         -- record sample
from selected_locations
group by our_location
having our_location is not null
order by cnt_records desc
1{"className":"examples.fancy_numbers.Example$Builder","methodName":"category","fileName":"<driver>","lineNumber":123}["java.lang.ArrayIndexOutOfBoundsException: 100"][{"className":"examples.fancy_numbers.Example$Builder","methodName":"category","fileName":"<driver>","lineNumber":123}][100]{"features":1,"source":100,"flight":"edc4baaf-8545-45aa-a907-d1fadff8dec8","data":null,"issues":[{"category":1,"message":"java.lang.ArrayIndexOutOfBoundsException: 100","causes":[{"message":"100","stack":[{"className":"examples.fancy_numbers.Example$Builder","methodName":"category","fileName":"<driver>","lineNumber":123},{"className":"examples.fancy_numbers.Example$Builder","methodName":"buildData","fileName":"<driver>","lineNumber":92},{"className":"com.swoop.spark.records.RecordBuilder","methodName":"captureData","fileName":"RecordBuilder.scala","lineNumber":213},{"className":"com.swoop.spark.records.RecordBuilder$$anonfun$build$3","methodName":"apply","fileName":"RecordBuilder.scala","lineNumber":199},{"className":"com.swoop.spark.records.RecordBuilder$$anonfun$build$3","methodName":"apply","fileName":"RecordBuilder.scala","lineNumber":199},{"className":"scala.util.Try$","methodName":"apply","fileName":"Try.scala","lineNumber":192},{"className":"com.swoop.spark.records.RecordBuilder","methodName":"build","fileName":"RecordBuilder.scala","lineNumber":199},{"className":"examples.fancy_numbers.Example$$anonfun$buildRecords$1","methodName":"apply","fileName":"<driver>","lineNumber":163},{"className":"examples.fancy_numbers.Example$$anonfun$buildRecords$1","methodName":"apply","fileName":"<driver>","lineNumber":163}]}],"id":null,"details":null}]}