Forum Discussion
Ben Martens
Microsoft
Feb 05, 2019Harnessing the Power of Left-Anti Joins in the Kusto Query Language
The Kusto query language supports a variety of joins. Left-anti might not be among the most common ones used, but it can be one of the most powerful. The docs state that a left-anti join “returns all records from the left side that do not match any record from the right side.” Let’s walk through two ways that this can be used in your processing pipeline.
Late-Arriving Data
Let’s say that we have an incoming stream if time-series data that we want to process. We have a function called ProcessData(startTime:datetime, endTime:datetime) that periodically gets executed and written to a table called Output via .set-or-append commands. The function processes data between those two timestamps in the parameters. Since we don’t want to end up with duplicate rows, we can’t rerun with the same time window. We can, however, catch the late arriving data for that time window by implementing ProcessData in such a way that it reprocesses all the data in the previous day and then does a left-anti join against the Output table to only return the results haven’t been recorded yet. Anything new gets written to the Output table by the set-or-append command and the duplicates get thrown away.
.create-or-alter function with (folder = "demo", skipvalidation = "true") ProcessData (startTime:datetime, endTime:datetime) {
let lookback = 1d;
let allData =
SourceData
| where Timestamp >= startTime - lookback and Timestamp < endTime
;
OutputTable
| join kind = leftanti (allData) on DeviceId, Timestamp
}
[Update 2019-02-21] The Kusto docs have a good document on dealing with late arriving data.
Changelog
Left-anti joins can also be used to create a changelog. Let’s say there is a process that is dumping 500,000 rows of data into a table. Those rows contain information about a set of devices. The table gets dropped and replaced every day. We can make a CreateChangelog() function that gets its results written to the Changelog table via set-or-append commands.
We can do a left-anti join with the data we already have in Output and only write the rows that have changed. So the CreateChangelog function body would look something like this:
DeviceData
| where PreciseTimeStamp >= startTime and PreciseTimeStamp < endTime
| project DeviceId, DimensionA
| join kind = leftanti(
Output
| project DeviceId, DimensionA
) on DeviceId
| project DeviceId, DimensionA, ProcessedTime=now()
Now the Output table has a record of every time that a device was added, removed or modified.
No RepliesBe the first to reply