Skip to content

SQL Functions Documentation

_

_( [ddl type], [nullable] ) provides PlaceHolders for lambda functions to allow partial application, use them in place of actual values or expressions to either change arity or allow use in _lambda_.

The default type is Long / Bigint, you will have to provide the types directly when using something else. By default the placeholders are assumed to be nullable (i.e. true), you can use false to state the field should not be null.

_lambda_

_lambda_( user function ) extracts the Spark LambdaFunction from a resolved user function, this must have the correct types expected by the Spark HigherOrderFunction they are parameters for.

This allows using user defined functions and lambdas with in-built Spark HigherOrderFunctions

aggExpr

aggExpr( [ddl sum type], filter, sum, result) aggregates on rows which match the filter expression using the sum expression to aggregate then processes the results using the result expression.

You can run multiple aggExpr's in a single pass select, use the first parameter to thread DDL type information through to the sum and result functions.

bigBloom

bigBloom(buildFrom, expectedSize, expectedFPP, 'bloom_id') creates an aggregated bloom filter using the buildFrom expression.

The blooms are stored on a shared filesystem using the bloom_id, they can scale to high numbers of items whilst keeping the FPP (e.g. millions at 0.01 would imply 99% probability, you may have to cast to double in Spark 3.2).

buildFrom can be driven by digestToLongs or hashWith functions when using multiple fields.

callFun

callFun( user function lambda variable, param1, param2, … paramN ) used within a lambda function it allows calling a lambda variable that contains a user function.

Used from the top level sql it performs a similar function expecting either a full user function or a partially applied function, typically returned from another lambda user function.

coalesceIfAttributesMissing

coalesceIfAttributesMissing(expr, replaceWith) substitutes expr with the replaceWith expression when expr has missing attributes in the source dataframe. Your code must call the scala processIfAttributeMissing function before using in validate or ruleEngineRunner/ruleRunner:

val missingAttributesAreReplacedRS = processIfAttributeMissing(rs, struct)

val (errors, _) = validate(struct, missingAttributesAreReplacedRS)

// use it missingAttributesAreReplacedRS in your dataframe..

coalesceIfAttributesMissingDisable

coalesceIfAttributesMissingDisable(expr) substitutes expr with the DisabledRule Integer result (-2) when expr has missing attributes in the source dataframe. Your code must call the scala processIfAttributeMissing function before using in validate or ruleEngineRunner/ruleRunner:

val missingAttributesAreReplacedRS = processIfAttributeMissing(rs, struct)

val (errors, _) = validate(struct, missingAttributesAreReplacedRS)

// use it missingAttributesAreReplacedRS in your dataframe..

digestToLongs

digestToLongs('digestImpl', fields*) creates an array of longs based on creating the given MessageDigest impl. A 128-bit impl will generate two longs from it's digest

digestToLongsStruct

digestToLongsStruct('digestImpl', fields*) creates structure of longs with i0 to iN named fields based on creating the given MessageDigest impl.

disabledRule

disabledRule() returns the DisabledRule Integer result (-2) for use in filtering and to disable rules (which may not signify a version bump)

failed

failed() returns the Failed Integer result (0) for use in filtering

fieldBasedID

fieldBasedID('prefix', 'digestImpl', fields*) creates a variable bit length id by using a given MessageDigest impl over the fields, prefix is used with the _base, _i0 and _iN fields in the resulting structure

flattenResults

flattenResults(dataQualityExpr) expands data quality results into a flat array

flattenRuleResults

flattenRuleResults(dataQualityExpr) expands data quality results into a structure of flattenedResults, salientRule (the one used to create the output) and the rule result.

salientRule will be null if there was no matching rule

hashFieldBasedID

hashFieldBasedID('prefix', 'digestImpl', fields*) creates a variable bit length id by using a given Guava Hasher impl over the fields, prefix is used with the _base, _i0 and _iN fields in the resulting structure

hashWith

hashWith('HASH', fields*) Generates a hash value (array of longs) suitable for using in blooms based on the given Guava hash implementation.

Note based on testing the digestToLongs function for SHA256 and MD5 are faster.

Valid hashes: MURMUR3_32, MURMUR3_128, MD5, SHA-1, SHA-256, SHA-512, ADLER32, CRC32, SIPHASH24. When an invalid HASH name is provided MURMUR3_128 will be chosen.

Open source Spark 3.1.2 issues

On Spark 3.1.2 open source this may get resolver errors due to a downgrade on guava version - 15.0 is used on Databricks, open source 3.0.3 uses 16.0.1, 3.1.2 drops this to 11 and misses crc32, sipHash24 and adler32.

hashWithStruct

per hashWith('HASH', fields*) but generates a struct with i0 to ix named longs. This structure is not suitable for blooms

idEqual

idEqual(leftPrefix, rightPrefix) takes two prefixes which will be used to match leftPrefix_base = rightPrefix_base, i0 and i1 fields. It does not currently support more than two i's

inc

inc() increments the current sum by 1

Alternatives:

inc( x ) use an expression of type Long to increment

longPair

longPair(lower, higher) creates a structure with these lower and higher longs

longPairEqual

longPairEqual(leftPrefix, rightPrefix) takes two prefixes which will be used to match leftPrefix_lower = rightPrefix_lower and leftPrefix_higher = rightPrefix_higher

longPairFromUUID

longPairFromUUID(expr) converts a UUID to a structure with lower and higher longs

mapContains

mapContains(expr, 'mapid') returns true if there is an item in the map

mapLookup

mapLookup(expr, 'mapid') returns either the lookup in map specified by mapid or null

meanF

meanF() simple mean on the results, expecting sum and count type Long

murmur3ID

murmur3ID('prefix', fields*) Generates a 160bit id using murmer3 hashing over input fields, prefix is used with the _base, _i0 and _i1 fields in the resulting structure

packInts

packInts(lower, higher) a packaged long from two ints, used within result compression

passed

passed() returns the Passed Integer for use in filtering: 10000

prefixedToLongPair

prefixedToLongPair('prefix', field) converts a 128bit longpair field with the given prefix into a higher and lower long pair without prefix.

This is suitable for converting provided id's into uuids for example via a further call to rngUUID.

printCode

printCode( [msg], expr ) prints the code generated by an expression, the value variable and the isNull variable and forwards eval calls / type etc. to the expression.

The code is printed once per partition on the executors std. output. You will have to check each executor to find the used nodes output. To use with unit testing on a single host you may overwrite the writer function in registerQualityFunctions, you should however use a top level object and var to write into (or stream), printCode will not be able to write to std out properly (spark redirects / captures stdout) or non top level objects (due to classloader / function instance issues). Testing on other hosts without using stdout should do so to a shared file location or similar.

!!! "information" It is not compatible with every expression Aggregate expressions like aggExpr or sum etc. won't generate code so they aren't compatible with printCode.

\_lambda\_ is also incompatible with printCode both wrapping a user function and the \_lambda\_ function.  Similarly the \_() placeholder function cannot be wrapped.

Any function expecting a specific signature like aggExpr or other HigherOrderFunctions like aggregate or filter are unlikely to support wrapped arguements.

printExpr

printExpr( [msg], expr ) prints the expression tree via toString with an optional msg

The message is printed to the driver nodes std. output, often shown in notebooks as well. To use with unit testing you may overwrite the writer function in registerQualityFunctions, you should however use a top level object and var to write into (or stream).

probability

probability(expr) will translate probability rule results into a double, e.g. 1000 returns 0.01. This is useful for interpreting and filtering on probability based results: 0 -> 10000 non-inclusive

probabilityIn

probabilityIn(expr, 'bloomid') returns the probability of the expr being in the bloomfilter specified by bloomid.

This function either returns 0.0, where it is definitely not present, or the original FPP where it may be present.

You may use digestToLongs or hashWith as appropriate to use multiple columns safely.

providedID

providedID('prefix', existingLongs) creates an id for an existing array of longs, prefix is used with the _base, _i0 and _iN fields in the resulting structure

resultsWith

resultsWith( x ) process results lambda x (e.g. (sum, count) -> sum ) that takes sum from the aggregate, count from the number of rows counted. Defaults both the sumtype and counttype as LongType

Alternatives:

resultsWith( [sum ddl type], x) Use the given ddl type for the sum type e.g. 'MAP<STRING, DOUBLE>'
resultsWith( [sum ddl type], [result ddl type], x) Use the given ddl type for the sum and result types

returnSum

returnSum( sum type ddl ) just returns the sum and ignores the count param, expands to resultsWith( [sum ddl_type], (sum, count) -> sum)

rng

rng() Generates a 128bit random id using XO_RO_SHI_RO_128_PP, encoded as a lower and higher long pair

Alternatives:

rng('algorithm') Uses Commons RNG RandomSource to implement the RNG
rng('algorithm', seedL) Uses Commons RNG RandomSource to implement the RNG with a long seed

rngBytes

rngBytes() Generates a 128bit random id using XO_RO_SHI_RO_128_PP, encoded as a byte array

Alternatives:

rngBytes('algorithm') Uses Commons RNG RandomSource to implement the RNG
rngBytes('algorithm', seedL) Uses Commons RNG RandomSource to implement the RNG with a long seed
rngBytes('algorithm', seedL, byteCount) Uses Commons RNG RandomSource to implement the RNG with a long seed, with a specific byte length integer (e.g. 16 is two longs, 8 is integer)

rngID

rngID('prefix') Generates a 160bit random id using XO_RO_SHI_RO_128_PP, prefix is used with the _base, _i0 and _i1 fields in the resulting structure

Alternatives:

rngId('prefix', 'algorithm') Uses Commons RNG RandomSource to implement the RNG, using other algorithm's may generate more long _iN fields
rngId('prefix', 'algorithm', seedL) Uses Commons RNG RandomSource to implement the RNG with a long seed, using other algorithm's may generate more long _iN fields

rngUUID

rngUUID(expr) takes either a structure with lower and higher longs or a 128bit binary type and converts to a string uuid

ruleSuiteResultDetails

ruleSuiteResultDetails(dq) strips the overallResult from the dataquality results, suitable for keeping overall result as a top-level field with associated performance improvements

saferLongPair

deprecated use uniqueId - saferLongPair(expr, 'bloomid') Prefer to use uniqueID, this 'safer' rng repeatedly calls the expr rng function until there is no matching entry in the bloom id. It returns lower and higher longs.

smallBloom

smallBloom(buildFrom, expectedSize, expectedFPP) creates a simply bytearray bloom filter using the expected size and fpp - 0.01 is 99%, you may have to cast to double in Spark 3.2. buildFrom can be driven by digestToLongs or hashWith functions when using multiple fields.

softFail

softFail(ruleexpr) will treat any rule failure (e.g. failed() ) as returning softFailed()

softFailed

softFailed() returns the SoftFailed Integer result (-1) for use in filtering

sumWith

sumWith( x ) adds expression x for each row processed in an aggExpr with a default of LongType

Alternatives:

sumWith( [ddl type], x) Use the given ddl type e.g. 'MAP<STRING, DOUBLE>'

uniqueID

uniqueID('prefix') Generates a 160bit guaranteed unique id (requires MAC address uniqueness) with contiguous higher values within a partition and overflow with timestamp ms., prefix is used with the _base, _i0 and _i1 fields in the resulting structure

unpack

unpack(expr) takes a packed rule long and unpacks it to a .id and .version structure

unpackIdTriple

unpackIdTriple(expr) takes a packed rule triple of longs (ruleSuiteId, ruleSetId and ruleId) and unpacks it to (ruleSuiteId, ruleSuiteVersion, ruleSetId, ruleSetVersion, ruleId, ruleVersion)

updateField

updateField(structure_expr, 'field.subfield', replaceWith, 'fieldN', replaceWithN) processes structures allowing you to replace sub items (think lens in functional programming) using the structure fields path name.

This is wrapped and almost verbatim version of Make Structs Easier' AddFields

zaFieldBasedID

zaFieldBasedID('prefix', 'digestImpl', fields*) creates a 64bit id (96bit including header) by using a given Zero Allocation impl over the fields, prefix is used with the _base and _i0 fields in the resulting structure.

Prefer using the zaLongsFieldBasedID for less collisions

zaHashLongsWith

zaHashLongsWith('HASH', fields*) generates a multi length long array but with a zero allocation implementation. This structure is suitable for blooms, the default XXH3 algorithm is the 128bit version of that used by the internal bigBloom implementation.

Available HASH functions are MURMUR3_128, XXH3

zaHashLongsWithStruct

similar to zaHashLongsWith('HASH', fields*) but generates an ID relevant multi length long struct, which is not suitable for blooms

zaHashWith

zaHashWith('HASH', fields*) generates a single length long array always with 64 bits but with a zero allocation implementation. This structure is suitable for blooms, the default XX algorithm is used by the internal bigBloom implementation.

Available HASH functions are MURMUR3_64, CITY_1_1, FARMNA, FARMOU, METRO, WY_V3, XX

zaHashWithStruct

similar to zaHashWith('HASH', fields*) but generates an ID relevant multi length long struct (of one long), which is not suitable for blooms.

Prefer zaHashLongsWithStruct for reduced collisions with either the MURMUR3_128 or XXH3 versions of hashes

zaLongsFieldBasedID

zaLongsFieldBasedID('prefix', 'digestImpl', fields*) creates a variable length id by using a given Zero Allocation impl over the fields, prefix is used with the _base, _i0 and _iN fields in the resulting structure. Murmur3_128 is faster than on the Guava implementation.


Last update: March 27, 2023 09:08:01
Created: March 27, 2023 09:08:01