Skip to content

Processors - Row By Row

Quality Processors allow for Quality rules to be used on a jvm outside of Spark execution. Spark is required for expression resolution and compilation so the pattern of usage is:

import com.sparkutils.quality.sparkless.ProcessFunctions._
case class InputData(fields)

val sparkSession = SparkSession.builder().
  config("spark.master", s"local[1]").
  config("spark.ui.enabled", false).getOrCreate()

registerQualityFunctions() // optional

try {
  val ruleSuite = // get rulesuite
  import sparkSession.implicits._
  // thread safe to share
  val processorFactory = dqFactory[InputData](ruleSuite)
  
  // in other threads an instance is needed
  val threadSpecificProcessor = processorFactory.instance
  try {
    val dqResults: RuleSuiteResult = threadSpecificProcessor(new InputData(...))
  } finally {
    // when your thread is finished doing work close the instance
    threadSpecificProcessor.close()
  }
  
} finally {
  sparkSession.stop()
}

Stateful expressions ruin the fun

Given the comment about "no Spark execution" why is a sparkSession present? The Spark infrastructure is used to compile code, this requires a running spark task or session to obtain configuration and access to the implicits for encoder derivation. IF the rules do not include stateful expressions (why would they?) and you use the default compilation, without Spark's higher order functions, this is also possible:

import com.sparkutils.quality.sparkless.ProcessFunctions._
case class InputData(fields)

val sparkSession = SparkSession.builder().
  config("spark.master", s"local[1]").
  config("spark.ui.enabled", false).getOrCreate()
val ruleSuite = // get rulesuite
import sparkSession.implicits._
// thread safe to share
val processorFactory = dqFactory[InputData](ruleSuite)

sparkSession.stop()

// in other threads an instance is needed
val threadSpecificProcessor = processorFactory.instance
try {
  threadSpecificProcessor.initialize(partitionId) // Optional, see below Partitions note
  val dqResults: RuleSuiteResult = threadSpecificProcessor(new InputData(...))
} finally {
  // when your thread is finished doing work close the instance
  threadSpecificProcessor.close()
}

The above bold IF is ominous, why the caveat? Stateful expressions using compilation are fine, the state handling is moved to the compiled code. If, however, the expressions are "CodegenFallback" and run in interpreted mode then each thread needs its own state. The same is true for using compile = false as a parameter, as such it's recommended to stick with defaults and avoid stateful expressions such as monotonically_incrementing_id, rand or unique_id.

Similarly, Spark's Higher Order Functions such as transform always require re-compilation. Later Spark versions or a custom compilation handlers can remedy this. Using Quality managed user functions is fine as long as they too don't use Spark's Higher Order Functions.

If the rules are free of such stateful expressions then the .instance function is nothing more than a call to a constructor on pre-compiled code.

In short, given stateful expressions can provide different answers for the same inputs it's something to be avoided unless you really need that behaviour.

Thread Safety

In all combinations of ProcessorFactory's the factory itself is thread safe and may be shared, the instances themselves are not and use mutable state to achieve performance.

Partitions / initialize?

Despite all the above commentary on Stateful expressions being awkward to use, if you choose to then you should use the initialize function with a unique integer parameter for each thread.

If you are not using stateful expressions you don't need to call initialize.

Encoders and Input types

The output types of all the runners are well-defined but, like the input types, rely on Spark Encoder's to abstract from the actual types.

For simple beans it's enough to use the Spark Encoders.bean(Class[_]) to derive an encoder or, when using Scala, Frameless encoding derivation.

Java Lists and Maps need special care

Using Java lists or maps with Encoders.bean doesn't work very often, the type information isn't available to the Spark code.

In Spark 3.4 and above you can use AgnosticEncoders instead and specify the types.

What about something more interesting like an Avro message?

val testOnAvro = SchemaBuilder.record("testOnAvro")
  .namespace("com.teston")
  .fields()
  .requiredString("product")
  .requiredString("account")
  .requiredInt("subcode")
  .endRecord()
val datumWriter = new GenericDatumWriter[GenericRecord](testOnAvro);

val bos = new ByteArrayOutputStream()
val enc = EncoderFactory.get().binaryEncoder(bos, null)

val avroTestData = testData.map{d =>
  val r = new GenericData.Record(testOnAvro)
  r.put("product", d.product)
  r.put("account", d.account)
  r.put("subcode", d.subcode)
  datumWriter.write(r, enc)
  enc.flush()
  val ba = bos.toByteArray
  bos.reset()
  ba
}

import s.implicits._

val processorFactory = ProcessFunctions.dqFactory[Array[Byte]](rs, inCodegen, extraProjection =
  _.withColumn("vals", org.apache.spark.sql.avro.functions.from_avro(col("value"), testOnAvro.toString)).
    select("vals.*"))
  ...

extraProjection allows conversion based on existing Spark conversion functions.

Map Functions

As correlated subqueries cannot be run outside of Spark the Quality Map functions must be used:

registerQualityFunctions()

val theMap = Seq((40, true),
  (50, false),
  (60, true)
)
val lookups = mapLookupsFromDFs(Map(
  "subcodes" -> ( () => {
    val df = theMap.toDF("subcode", "isvalid")
    (df, column("subcode"), column("isvalid"))
  } )
), LocalBroadcast(_))

registerMapLookupsAndFunction(lookups)

val rs = RuleSuite(Id(1,1), Seq(
  RuleSet(Id(50, 1), Seq(
    Rule(Id(100, 1), ExpressionRule("if(product like '%otc%', account = '4201', mapLookup('subcodes', subcode))"))
  ))
))

Note the use of LocalBroadcast, this implementation of Sparks Broadcast can be used without a SparkSession and just wraps the value.

Performance

All the information presented below is captured here in the Processor benchmark.
The run is informative but has some outlier behaviours and should be taken as a guideline only (be warned it takes almost a day to run).
This test evaluates compilation startup time only in the XStartup tests and the time for both startup and running through 100k rows at each fieldCount in a single thread (on a i9-9900K CPU @ 3.60GHz). The inputs for each row are an array of longs, provided by spark's user land Row, with the output a RuleSuiteResult object.

Test combinations to actual rules

rulesetCountfieldCountactual number of rules
25 10 30
25 20 55
25 30 80
25 40 105
25 50 130
50 10 60
50 20 110
50 30 160
50 40 210
50 50 260
75 10 90
75 20 165
75 30 240
75 40 315
75 50 390
100 10 120
100 20 220
100 30 320
100 40 420
100 50 520
125 10 150
125 20 275
125 30 400
125 40 525
125 50 650
150 10 180
150 20 330
150 30 480
150 40 630
150 50 780

As noted above the fastest startup time is with compile = false as no compilation takes place, this holds true until about the 780 rule mark where compilation catches up with the traversal and new expression tree copying cost. Each subsequent instance call will however pay the same cost again, moreover the actual runtime is by far the worst option:

default_vs_interpreted_processor.png

The lower green line represents the default configuration, which compiles a class and only creates new instances in the general case. The below graph shows the performance trend across multiple rule and field complexities:

default_processor_combos.png

In the top right case that's 780 rules total (run across 50 fields) with a cost of about 0.103ms per row (10,300 ms / 100,000 rows) or 0.000132ms/rule/row of simple mod checks.

The performance of the default configuration, leveraging Spark's MutableProjections, is consistently the second best accept for far smaller numbers of rules and field combinations, observable by selecting the 10 fieldCount, every other combination has the default CompiledProjections (GenerateDecoderOpEncoderProjection) in second place by a good enough margin. The first place belongs to the experimental VarCompilation, see the info box below for more details.

Experimental - VarCompilation

The default of forceVarCompilation = false uses a light compilation wrapping around Sparks MutableProjection approach, with the Spark team doing the heavy lifting.

In contrast the forceVarCompilation = true option triggers the experimental VarCompilation, mimicing WholeStageCodegen (albeit without severe input size restricitons).
It's additional speed is driven by JIT friendly optimisations and removing all unnecessary work, only encoding from the input data what is needed by the rules. The experimental label is due to the custom code approach, although it can handle thousands of fields actively used in thousands of rules there, and is fully tested it is still custom. This may be changed to the default option in the future.

The majority of cost is the serialisation of the results into the RuleSuiteResult's Scala Maps (via Sparks ArrayBasedMapData.toScalaMap).

If you remove the cost of serialisation, by lazily serializing, things look even faster:

dq_processing_vs_lazy.png

the top two lines are the default and VarCompilation and the bottom two lines their lazy versions, that's 0.0172453 per row and 0.000022109ms per mod check. The dqLazyDetailsFactory function serialises the overall result directly, but only serialises the results on demand, you can choose if you wish to process the details based on the overall result.

Precalculating Passed RuleSuiteResultDetails can be misleading

Although it's possible to use a pre-calculated RuleSuiteResultDetails against all "Passed" results this would not represent any disabled, soft failed or probability results. As such it's not provided by default, if you do have a default RuleSuiteResultDetails you would like to use then you can provide it to the dqLazyDetailsFactory function, using the RuleSuiteResultDetails.ifAllPassed function and the defaultIfPassed parameter.
Using the defaultIfPassed parameter stops actual results from being returned if a row is passed and will only return the default you supplied it.

lazyDQDetailsFactory is also useful if you just want to see if the rules passed and aren't interested in the details.
Similarly the lazyRuleEngineFactory and lazyRuleFolderFactory functions are lazy in their RuleSuiteResult serialisation, which may be appropriate when you are only interested should you not get a result.


Last update: June 6, 2025 09:13:57
Created: June 6, 2025 09:13:57