Processors - Row By Row
Quality Processors allow for Quality rules to be used on a jvm outside of Spark execution. Spark is required for expression resolution and compilation so the pattern of usage is:
import com.sparkutils.quality.sparkless.ProcessFunctions._
case class InputData(fields)
val sparkSession = SparkSession.builder().
config("spark.master", s"local[1]").
config("spark.ui.enabled", false).getOrCreate()
registerQualityFunctions() // optional
try {
val ruleSuite = // get rulesuite
import sparkSession.implicits._
// thread safe to share
val processorFactory = dqFactory[InputData](ruleSuite)
// in other threads an instance is needed
val threadSpecificProcessor = processorFactory.instance
try {
val dqResults: RuleSuiteResult = threadSpecificProcessor(new InputData(...))
} finally {
// when your thread is finished doing work close the instance
threadSpecificProcessor.close()
}
} finally {
sparkSession.stop()
}
Stateful expressions ruin the fun¶
Given the comment about "no Spark execution" why is a sparkSession present? The Spark infrastructure is used to compile code, this requires a running spark task or session to obtain configuration and access to the implicits for encoder derivation. IF the rules do not include stateful expressions (why would they?) and you use the default compilation this is also possible:
import com.sparkutils.quality.sparkless.ProcessFunctions._
case class InputData(fields)
val sparkSession = SparkSession.builder().
config("spark.master", s"local[1]").
config("spark.ui.enabled", false).getOrCreate()
val ruleSuite = // get rulesuite
import sparkSession.implicits._
// thread safe to share
val processorFactory = dqFactory[InputData](ruleSuite)
sparkSession.stop()
// in other threads an instance is needed
val threadSpecificProcessor = processorFactory.instance
try {
threadSpecificProcessor.initialize(partitionId) // Optional, see below Partitions note
val dqResults: RuleSuiteResult = threadSpecificProcessor(new InputData(...))
} finally {
// when your thread is finished doing work close the instance
threadSpecificProcessor.close()
}
The above bold IF is ominous, why the caveat? Stateful expressions using compilation are fine, the state handling is moved to the compiled code. If, however, the expressions are "CodegenFallback" and run in interpreted mode then each thread needs its own state. The same is true for using compile = false as a parameter, as such it's recommended to stick with defaults and avoid stateful expressions such as monotonically_incrementing_id, rand or unique_id.
If the rules are free of such stateful expressions then the .instance function is nothing more than a call to a constructor on pre-compiled code.
In short, given stateful expressions can provide different answers for the same inputs it's something to be avoided unless you really need that behaviour.
Thread Safety¶
In all combinations of ProcessorFactory's the factory itself is thread safe and may be shared, the instances themselves are not and use mutable state to achieve performance.
Partitions / initialize?¶
Despite all the above commentary on Stateful expressions being awkward to use, if you choose to then you should use the initialize function with a unique integer parameter for each thread.
If you are not using stateful expressions you don't need to call initialize.
Encoders and Input types¶
The output types of all the runners are well-defined but, like the input types, rely on Spark Encoder's to abstract from the actual types.
For simple beans it's enough to use the Spark Encoders.bean(Class[_]) to derive an encoder or, when using Scala, Frameless encoding derivation.
Java Lists and Maps need special care
Using Java lists or maps with Encoders.bean doesn't work very often, the type information isn't available to the Spark code.
In Spark 3.4 and above you can use AgnosticEncoders instead and specify the types.
What about something more interesting like an Avro message?
val testOnAvro = SchemaBuilder.record("testOnAvro")
.namespace("com.teston")
.fields()
.requiredString("product")
.requiredString("account")
.requiredInt("subcode")
.endRecord()
val datumWriter = new GenericDatumWriter[GenericRecord](testOnAvro);
val bos = new ByteArrayOutputStream()
val enc = EncoderFactory.get().binaryEncoder(bos, null)
val avroTestData = testData.map{d =>
val r = new GenericData.Record(testOnAvro)
r.put("product", d.product)
r.put("account", d.account)
r.put("subcode", d.subcode)
datumWriter.write(r, enc)
enc.flush()
val ba = bos.toByteArray
bos.reset()
ba
}
import s.implicits._
val processorFactory = ProcessFunctions.dqFactory[Array[Byte]](rs, inCodegen, extraProjection =
_.withColumn("vals", org.apache.spark.sql.avro.functions.from_avro(col("value"), testOnAvro.toString)).
select("vals.*"))
...
extraProjection allows conversion based on existing Spark conversion functions.
Map Functions¶
As correlated subqueries cannot be run outside of Spark the Quality Map functions must be used:
registerQualityFunctions()
val theMap = Seq((40, true),
(50, false),
(60, true)
)
val lookups = mapLookupsFromDFs(Map(
"subcodes" -> ( () => {
val df = theMap.toDF("subcode", "isvalid")
(df, column("subcode"), column("isvalid"))
} )
), LocalBroadcast(_))
registerMapLookupsAndFunction(lookups)
val rs = RuleSuite(Id(1,1), Seq(
RuleSet(Id(50, 1), Seq(
Rule(Id(100, 1), ExpressionRule("if(product like '%otc%', account = '4201', mapLookup('subcodes', subcode))"))
))
))
Note the use of LocalBroadcast, this implementation of Sparks Broadcast can be used without a SparkSession and just wraps the value.
Created: May 21, 2025 17:12:07