Ksqldb explode multiple list elements

I have data like this:

One Kafka Message:

[{"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData"}]

Another Kafka Message:

[{"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData2"}, {"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData3"}]

I want ksql to create a stream of three elements for this. How can i do this? I already tried it with explode but there i need a key that i do not have in my data.

My current query attempt:

CREATE STREAM ml_stream_raw (
    data ARRAY<STRUCT<source VARCHAR, numericValue DOUBLE, created BIGINT, textValue VARCHAR>>
) WITH (
    KAFKA_TOPIC='testml1withlist',
    VALUE_FORMAT='JSON'
);

CREATE STREAM ml_stream_raw_exploded AS
    SELECT EXPLODE(data) AS message
    FROM ml_stream_raw;

CREATE STREAM ml_stream_raw_processed AS
    SELECT 
        message->source AS source,
        message->numericValue AS numericValue,
        message->created AS created,
        message->textValue AS textValue
    FROM  ml_stream_raw_exploded;

CREATE TABLE ml_table_messages_per_second_list AS
    SELECT 'all_messages' AS message_group,
           WINDOWSTART AS window_start,
           COUNT(*) AS message_count
    FROM ml_stream_raw_processed
    WINDOW TUMBLING (SIZE 1 SECOND)
    GROUP BY 'all_messages';

SELECT TIMESTAMPTOSTRING(window_start, 'dd.MM.yyyy HH:mm:ss') AS window_start_formatted,
       message_count
FROM ml_table_messages_per_second_list
EMIT CHANGES;

SQL

CREATE STREAM ml_stream_raw (
    data ARRAY<STRUCT<source VARCHAR, numericValue DOUBLE, created BIGINT, textValue VARCHAR>>
) WITH (
    KAFKA_TOPIC='testml1withlist',
    VALUE_FORMAT='JSON'
);

CREATE STREAM ml_stream_processed AS
WITH processed AS (
  SELECT data, ROW_NUMBER() OVER (PARTITION BY data[1].source ORDER BY data[1].created ASC) AS row_num
  FROM ml_stream_raw
)
SELECT
  message.source AS source,
  message.numericValue AS numericValue,
  message.created AS created,
  message.textValue AS textValue
FROM processed
LATERAL VIEW explode(data) AS message
WHERE row_num <= 3; -- Limit to 3 elements per message

Explanation:

  1. Create ml_stream_raw (Unchanged):
  • This stream definition remains the same, defining the schema for incoming messages with an array of objects.
  1. Create ml_stream_processed:
  • CTE (processed):
    • This CTE processes the raw data.
    • ROW_NUMBER() assigns a unique row number to each element in the data array.
      • The partitioning key is data[1].source to ensure ordering within each message’s array.
      • The ordering criteria is data[1].created (ascending) to maintain the original order.
  • LATERAL VIEW:
    • This clause iterates over each element in the data array, generating a separate row for each element as message .
  • WHERE Clause:
    • It filters the results to include only rows where row_num is less than or equal to 3, effectively limiting the output to a maximum of three elements per Kafka message.

3. Remaining Queries (Unchanged):

  • You can keep your original queries for creating the ml_table_messages_per_second_list and the final SELECT statement for message counting.

Key Improvements:

  • This approach avoids the need for an explicit key in the data, which wasn’t readily available.
  • It preserves the original data within each element of the stream.
  • It ensures a maximum of three elements per message, adhering to the requirement.

With these modifications, ksqlDB will now create a stream containing at most three elements for each Kafka message, based on the order of the created timestamp within each message’s array.