Home > database >  Usage of JOIN in ASA caused mismatch
Usage of JOIN in ASA caused mismatch

Time:02-05

I came across a case in my ASA code. The idea is to Match two records Record1 and Record2. In both records, there are key1 and key2. In Record2, if key2 is not null, we group Record2 by key1 and key2 and form a new table Record2WithKey2. If key2 is null, we group Record2 by key1 and form a new table Record2WithoutKey2. In table Record2WithKey2 and Record2WithoutKey2, if any two records from the two tables respectively have the same key1, they need to be merged and keep the key2 from table Record2WithKey2; otherwise, the record remains the same. After the above step, we have a new table FinalRecord2. Then we match Record1 with FinalRecord2. If we find a match from FinalRecord2 that key1 and key2 both match, the record in Record1 is marked as valid. If key1 matches and key2 from FinalRecord2 is null, the record from Record1 is marked as invalid. Then output the result as a table. Here is my code:

With Record2WithKey2 AS (
  SELECT [key1], [key2], COLLECT([values]) [values]
  FROM Record2
  WHERE key2 IS NOT NULL
  GROUP BY System.Timestamp(), key1, key2
),
Record2WithoutKey2 AS (
  SELECT [key1], '0' [key2], COLLECT([values]) [values]
  FROM Record2
  WHERE key2 IS NULL
  GROUP BY System.Timestamp(), key1
),
FinalRecord2 AS (
  SELECT e1.key1 [key1], e1.key2 [key2], UDF.ConcatArrays(e1.values, e2.values) [values]
  FROM Record2WithKey2 e1
  LEFT OUTER JOIN Record2WithoutKey2 e2
  ON DATEDIFF(second, e1, e2) = 0
  AND e1.key1 = e2.key1
  
  UNION ALL
  
  SELECT e2.key1 [key1], e2.key2 [key2], e2.values [values]
  FROM Record2WithoutKey2 e2
  LEFT OUTER JOIN Record2WithtKey2 e1
  ON DATEDIFF(second, e1, e2) = 0
  AND e1.key1 = e2.key1
  WHERE e1.key1 IS NULL
),
JoinedRecord AS (
  SELECT r1.*, r2.values [values], 'Valid' [Result]
  FROM FinalRecord2 r2
  INNER JOIN Record1 r1
  ON DATEDIFF(second, r1, r2) = 0
  AND r1.key1 = r2.key1
  AND r1.key2 = r2.key2

  UNION ALL

  SELECT r1.*, r2.values [values], 'Invalid' [Result]
  FROM FinalRecord2 r2
  INNER JOIN Record1 r1
  ON DATEDIFF(second, r1, r2) = 0
  AND r1.key1 = r2.key1
  AND r2.key2 = '0'
)
SELECT * 
INTO Output
From JoinedRecord

Record1:

[
    {
        "key1": "value1",
        "key2": "value2",
    },
    {
        "key1": "value3",
        "key2": "value4",
    },
    {
        "key1": "value5",
        "key2": "value6",
    },
    {
        "key1": "value5",
        "key2": "value7",
    },
    {
        "key1": "value8",
        "key2": "value9",
    }
]

Record2:

[
    {
        "key1": "value1",
        "key2": "value2",
        "values": 1
    },
    {
        "key1": "value3",
        "key2": null,
        "values": 2
    },
    {
        "key1": "value5",
        "key2": "value6",
        "values": 3
    },
    {
        "key1": "value5",
        "key2": null,
        "values": 4
    },
    {
        "key1": "value5",
        "key2": "value7",
        "values": 5
    }
]

Expected Output:

[
    {
        "key1": "value1",
        "key2": "value2",
        "values": [1],
        "Result": "Valid"
    },
    {
        "key1": "value3",
        "key2": "value4",
        "values": [2],
        "Result": "Inalid"
    },
    {
        "key1": "value5",
        "key2": "value6",
        "values": [3,4],
        "Result": "Valid"
    },
    {
        "key1": "value5",
        "key2": "value7",
        "values": [4,5],
        "Result": "Valid"
    }
]

But I actually got:

{
    "key1": "value5",
    "key2": "value6",
    "values": [3,4],
    "Result": "Valid"
},
{
    "key1": "value5",
    "key2": "value7",
    "values": [4,5],
    "Result": "Valid"
}

Does anyone know what the problem is with my ASA code? Any suggestions would be highly appreciated. Thanks!

CodePudding user response:

UPDATED to accommodate:

  • key1 can have multiple key2 in record1
  • key1 can have multiple key2, including null, in record2

Adding the above constraints forced me to use UNION again. With that I was able to realize what was going on in your query.

ASA is a stream processor. That means that each step of the query happens on a timeline. Every blocking operations, like GROUP BY or JOIN, require a time component. See that doc for more details.

But on paper, when operating at a given timestamp, we should be able to forget about the timeline. If we aggregate with a snapshot window, and set the DATEDIFF at 0 for joins, then the entire query should execute in the same timespan and behave like a normal SQL query in a standard SQL database.

In reality, some operations may have an impact on the timestamp of the records they process. Here it's due to the LEFT JOIN. In ASA all joins are temporal. Let's say we are trying to join 2 streams of data, A and B, with A LEFT JOIN B ON DATEDIFF(second,A,B) = 0. When we get a record from A, we will start to wait for the expected duration - here 0 second - for all records coming from B. Let's say none arrives here. When the clock moves to the next tick (next second here), we declare the period over and can release that lonely record from A. But we need to update its timestamp: it is emitted on the new clock time that moved forward by one tick (1 second).

Here's a first query that works, despite using LEFT JOIN:


WITH
-- Cleaning and timestamp
Record1preprocessed AS (
    SELECT
        CAST([key1] AS NVARCHAR(MAX)) AS key1,
        CAST([key2] AS NVARCHAR(MAX)) AS key2
    FROM record1 TIMESTAMP BY appTimeStamp
),

-- Cleaning and timestamp
Record2preprocessed AS (
    SELECT
        CAST([key1] AS NVARCHAR(MAX)) AS key1,
        CAST([key2] AS NVARCHAR(MAX)) AS key2,
        CAST([values] AS BIGINT) AS [values]
    FROM record2 TIMESTAMP BY appTimeStamp
),

Record2WithKey2 AS (
    SELECT
        [key1],
        [key2],
        COLLECT([values]) AS [values],
        System.Timestamp() AS ts
    FROM Record2preprocessed
    WHERE key2 IS NOT NULL
    GROUP BY System.Timestamp(), key1, key2
),

Record2WithoutKey2 AS (
    SELECT
        [key1],
        '0' AS [key2],
        COLLECT([values]) [values],
        System.Timestamp() AS ts
    FROM Record2preprocessed
    WHERE key2 IS NULL
    GROUP BY System.Timestamp(), key1
),

FinalRecord2 AS (
    -- Records with key2 are joined with records without to merge their arrays
    SELECT
        e1.key1,
        e1.key2,
        UDF.ConcatArrays(e1.[values], e2.[values]) AS [values],
        System.Timestamp() AS ts
    FROM Record2WithKey2 e1
    LEFT JOIN Record2WithoutKey2 e2
    ON  DATEDIFF(second, e1, e2) = 0
        AND e1.key1 = e2.key1

    UNION ALL

    -- Records without key2 are joined with records with key2 to filter them out (they are already in the first subquery above)
    -- Here we only want the key1 that never have a key2
    SELECT
        e2.key1,
        e2.key2,
        e2.[values] [values],
        System.Timestamp() AS ts
    FROM Record2WithoutKey2 e2
    LEFT JOIN Record2WithKey2 e1
    ON  DATEDIFF(second, e1, e2) = 0
        AND e1.key1 = e2.key1
    WHERE e1.key1 IS NULL
)

,JoinedRecord AS (
    SELECT
        r1.key1,
        r1.key2,
        r2.[values],
        CASE WHEN r1.key2 = r2.key2 THEN 'Valid' ELSE 'Invalid' END AS [Result],
        System.Timestamp() AS ts
    FROM Record1preprocessed r1
    LEFT JOIN FinalRecord2 r2
    ON  DATEDIFF(second, r1, r2) BETWEEN 0 AND 1
        AND r1.key1 = r2.key1
    WHERE
        (r2.key2 = '0')
        OR (r1.key2 = r2.key2)
)

SELECT * INTO OutputFinal FROM JoinedRecord

It uses that JavaScript UDF:

function main(arg1, arg2) {
    if (arg2 == null) {return arg1;}
    else {return arg1.concat(arg2);}
}

You'll notice that it looks a lot like yours. For the most part it is. I've added some pre-processing steps to add an explicit timestamp. It shouldn't be strictly necessary, but I wanted to be sure.

The new input records are:

  • Record1
{"key1": "value1","key2": "value2","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value3","key2": "value4","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value6","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value7","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value8","key2": "value9","appTimeStamp":"2022-02-02T00:00:00"}
  • Record2
{"key1": "value1","key2": "value2","values": 1,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value3","key2": null,"values": 2,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value6","values": 3,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": null,"values": 4,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value7","values": 5,"appTimeStamp":"2022-02-02T00:00:00"}

I added SYSTEM.TIMESTAMP() to every CTE to be able to debug where the drift was happening.

If you do the same, you'll notice that FinalRecord2 gives you:

{"key1":"value5","key2":"value7","values":[5,4],"ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value5","key2":"value6","values":[3,4],"ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value1","key2":"value2","values":[1],"ts":"2022-02-02T00:00:01.0000000Z"}
{"key1":"value3","key2":"0","values":[2],"ts":"2022-02-02T00:00:01.0000000Z"}

Notice how the timestamp advanced by 1 second for the records that had to timeout in the LEFT JOINs.

This can be addressed by giving some leeway in the JOIN condition of JoinedRecord: DATEDIFF(second, r1, r2) BETWEEN 0 AND 1.

This in turn give you the expected result:

{"key1":"value5","key2":"value7","values":[5,4],"Result":"Valid","ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value5","key2":"value6","values":[3,4],"Result":"Valid","ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value1","key2":"value2","values":[1],"Result":"Valid","ts":"2022-02-02T00:00:01.0000000Z"}
{"key1":"value3","key2":"value4","values":[2],"Result":"Invalid","ts":"2022-02-02T00:00:01.0000000Z"}

Here adding SYSTEM.TIMESTAMP() is an absolute must when debugging such issues. This behavior doesn't happen with INNER JOINs. I'll try to update the query to remove left joins.


BEFORE THE UPDATE

Here's my take on this problem:

WITH

-- First we regroup record2 to have only one row per key1. We also replace null key2 by a hardcoded value
Record2Collected AS (
    SELECT
        [key1],
        COLLECT(CASE WHEN [key2] IS NULL THEN '0' ELSE [key2] END) AS [key2],
        COLLECT([values]) [values]
    FROM Record2
    GROUP BY System.Timestamp(), key1
)

-- Then we re-expend all key combinations, but this time they will have all the values each time. Then we filter: either there's a single key2 (the array of key2 has a single value) or we want the key2 that's not null
,Record2Processed AS (
    SELECT
        R.key1,
        C.ArrayValue AS key2,
        R.[values]
    FROM Record2Collected R
    CROSS APPLY GetArrayElements(R.[key2]) AS C
    WHERE
        GetArrayLength(R.key2) = 1 -- Single key2 for a key1
        OR
        C.ArrayValue != '0' -- Multiple key2 for a key1, we remove null values
)

--Then we match Record1 with Record2Processed. We do it in a single join to avoid duplications.
,JoinedRecord AS (
    SELECT
        r1.key1,
        r1.key2,
        r2.[values],
        CASE WHEN r1.key2 = r2.key2 THEN 'Valid' ELSE 'Invalid' END AS [Result]
    FROM Record2Processed r2
    LEFT JOIN Record1 r1 -- We're expecting r1 to have unique key1s here
    ON DATEDIFF(second, r1, r2) = 0 -- All records are supposed to be on the same event time, see comment below
    AND r1.key1 = r2.key1
)

SELECT * INTO myOutput FROM JoinedRecord

Which will give you:

{"key1":"value3","key2":"value4","values":[2],"Result":"Invalid"}
{"key1":"value1","key2":"value2","values":[1],"Result":"Valid"}
{"key1":"value5","key2":"value6","values":[3,4],"Result":"Valid"}

Here are some aspects that I'm cautious about:

  • We are using snapshot windows, and not using a TIMESTAMP BY, which means that the logic will apply to groups of records landing in the streaming input at the same time. This feels a bit risky. Is it the expected behavior? I would either define a time window, or use an application time.
  • In Record2Processed, we will have duplications of values for key1 that have multiple non null key2. If this can happen, we need to update the query logic
  • In JoinedRecord, we have to join from r2 to r1 only on key1 so we don't lose null key2s. If key1 can have multiple key2 in r1, we need to update the logic. Also if some key combinations can be found in r1 but not in r2, they will be lost currently. We would also need to update the logic

Please let me know if some of the conditions above are expected, and I'll udpate the query logic.

  •  Tags:  
  • Related