%python spark.table("fancy_numbers_neg5to100").createOrReplaceTempView("records_to_investigate") spark.table("records_to_investigate").printSchema()
root
|-- features: integer (nullable = true)
|-- data: struct (nullable = true)
| |-- n: integer (nullable = true)
| |-- category: string (nullable = true)
|-- source: integer (nullable = true)
|-- flight: string (nullable = true)
|-- issues: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- category: integer (nullable = true)
| | |-- message: string (nullable = true)
| | |-- causes: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- message: string (nullable = true)
| | | | |-- stack: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- className: string (nullable = true)
| | | | | | |-- methodName: string (nullable = true)
| | | | | | |-- fileName: string (nullable = true)
| | | | | | |-- lineNumber: integer (nullable = true)
| | |-- id: integer (nullable = true)
| | |-- details: string (nullable = true)
%sql select issue.category, issue.id, count(*) as cnt, collect_set(issue.message) as messages, collect_set(coalesce(source, data.n)) as sources -- we only save sources for errors but we know the number is also in data.n from records_to_investigate lateral view explode(issues) exploded as issue where features <> 0 -- we filter to records that have any feature flag set group by issue.category, issue.id order by issue.category, issue.id
import com.swoop.spark.records._ Seq(Issue.Error, Issue.Warning, Issue.Info, Issue.Debug).map(o => (o.featureMask, o.idMessages)).foreach(println)
(1,Map(0 -> unknown, 1 -> internal error, 2 -> unsupported schema, 3 -> missing required argument, 4 -> data quality check failed))
(2,Map(0 -> unknown))
(4,Map(0 -> unknown))
(8,Map(0 -> unknown))
import com.swoop.spark.records._
%python # Dictionary of issue categories mapping to issue IDs mapping to messages associated with these IDs id_messages = { 1 : { # error 0: "unknown", 1: "internal error", 2: "unsupported schema", 3: "missing required parameter", 4: "data quality check failed", # from our fancy numbers example 1001: "negative number" }, 2 : { # warning 0: "unknown" }, 4 : { # info 0: "unknown" }, 8 : { # debug 0: "unknown" } } # Flatten the above into an array of dictionaries list = [] for category, messages in id_messages.iteritems(): for issue_id, id_message in messages.iteritems(): list.append({"category": category, "id": issue_id, "id_message": id_message}) # Save the mapping into a table # We may get a warning: "inferring schema from dict is deprecated". Yes, but it is rather expedient in this case. :-) spark.createDataFrame(list).write.mode("overwrite").saveAsTable("fancy_numbers_id_messages") # Create a temp view with a more standard name so that we can use %sql cells in the notebook spark.table("fancy_numbers_id_messages").createOrReplaceTempView("records_id_messages")
%sql with issues as ( select issue.category, issue.id, count(*) as cnt, collect_set(issue.message) as messages, collect_set(source) as sources from records_to_investigate lateral view explode(issues) exploded as issue where (features & 1) <> 0 group by issue.category, issue.id ) select lhs.id, id_message, cnt, messages, sources from issues as lhs -- Use a left outer join in order not to remove rows without an entry in records_id_messages left outer join records_id_messages rhs on lhs.category = rhs.category and lhs.id = rhs.id order by cnt desc
%sql with error_records ( -- Filter records to ones containing error issues only. select *, -- Assign unique row IDs to the records, since we'll need to partition the data when we look for the first stack trace position in our code. -- We wouldn't need this if Spark allowed unsorted windows, e.g., lag(col) over (). -- Normally, unsorted windows don't make sense in distributed systems but they do make sense with rows generated by LATERAL VIEW as there is implicit linear ordering there -- Here we use a random UUIDv4 by invoking a the statis randomUUID() method of java.util.UUID reflect('java.util.UUID','randomUUID') as row_id from records_to_investigate where (features & 1) <> 0 -- records marked as having errors only ), all_locations as ( -- Explode issues, causes and stack trace elements. -- Keep only the first element of a stack track or all the elements from our code (class names from the examples.fancy_numbers package) select *, -- Find the stack trace position of the previous stack element. -- The result will be null for the first stack element and 0 for the first stack element in our code after the first stack element lag(stack_pos) over ( partition by row_id, issue_pos, cause_pos order by stack_pos) as prev_stack_pos, -- Find the class name of the previous stack element. -- As with prev_stack_pos, the result will be null for the first stack element lag(stack.className) over ( partition by row_id, issue_pos, cause_pos order by stack_pos) as prev_class_name from error_records lateral view posexplode(issues) exploded_issues as issue_pos, issue lateral view posexplode(issue.causes) exploded_causes as cause_pos, cause lateral view posexplode(cause.stack) exploded_stack_elements as stack_pos, stack where issue.category = 1 -- error issues and issue.id is null -- unknown ones and ( stack_pos = 0 -- first stack element or stack.className regexp 'examples\.fancy_numbers\.') -- or a stack element from our code ), selected_locations as ( -- Filter locations to just the exception origin or the first location in our code. -- Handle the special case where our code may be first in the stack trace. select *, if(prev_stack_pos is null, stack, null) as exception_origin, if(stack.className regexp 'examples\.fancy_numbers\.', stack, null) as our_location from all_locations where (prev_stack_pos is null or prev_stack_pos = 0) and (prev_class_name is null or (not prev_class_name regexp 'examples\.fancy_numbers\.')) ) -- Summarize the results. -- We group by the first stack trace location in our code. -- We summarize the exception origins (there could be more than one if the line in our code did multiple things) select count(distinct row_id) as cnt_records, -- number of records where the exception is at this location in our code first(our_location, true) as our_location, -- first stack trace element in our code collect_set(issue.message) as messages, -- summary of exception messages at this location collect_set(exception_origin) as exception_locations, -- unique exception origins collect_set(source) as sources, -- summary of unique inputs causing the exception first(named_struct( 'features', features, 'source', source, 'flight', flight, 'data', data, 'issues', issues )) as sample_record -- record sample from selected_locations group by our_location having our_location is not null order by cnt_records desc
Advanced Root Cause Analysis
By following the patterns of Spark Records you gain the benefits lightning-fast root cause analysis even if your records were built using tooling different from the one provided by the framework.
This notebook demonstrates how this can be done using Python and SparkSQL. We will analyze the output of the
FancyNumbers
job from this notebook in three steps:Prepare job output for analysis.
Generate issue summaries.
Drill down to find the exact location of unknown errors.
Beyond showing how to do root cause analysis directly against records, this notebook also demonstrates how to customize the analysis to your particular job & input/output data.
Tip: At Swoop we reuse common root cause analysis operations by putting them in a shared notebook that we simply
%include
into the notebook we are currently working in.Last refresh: Never