I want ksql to create a stream of three elements for this. How can i do this? I already tried it with explode but there i need a key that i do not have in my data.
My current query attempt:
CREATE STREAM ml_stream_raw (
data ARRAY<STRUCT<source VARCHAR, numericValue DOUBLE, created BIGINT, textValue VARCHAR>>
) WITH (
KAFKA_TOPIC='testml1withlist',
VALUE_FORMAT='JSON'
);
CREATE STREAM ml_stream_raw_exploded AS
SELECT EXPLODE(data) AS message
FROM ml_stream_raw;
CREATE STREAM ml_stream_raw_processed AS
SELECT
message->source AS source,
message->numericValue AS numericValue,
message->created AS created,
message->textValue AS textValue
FROM ml_stream_raw_exploded;
CREATE TABLE ml_table_messages_per_second_list AS
SELECT 'all_messages' AS message_group,
WINDOWSTART AS window_start,
COUNT(*) AS message_count
FROM ml_stream_raw_processed
WINDOW TUMBLING (SIZE 1 SECOND)
GROUP BY 'all_messages';
SELECT TIMESTAMPTOSTRING(window_start, 'dd.MM.yyyy HH:mm:ss') AS window_start_formatted,
message_count
FROM ml_table_messages_per_second_list
EMIT CHANGES;
CREATE STREAM ml_stream_raw (
data ARRAY<STRUCT<source VARCHAR, numericValue DOUBLE, created BIGINT, textValue VARCHAR>>
) WITH (
KAFKA_TOPIC='testml1withlist',
VALUE_FORMAT='JSON'
);
CREATE STREAM ml_stream_processed AS
WITH processed AS (
SELECT data, ROW_NUMBER() OVER (PARTITION BY data[1].source ORDER BY data[1].created ASC) AS row_num
FROM ml_stream_raw
)
SELECT
message.source AS source,
message.numericValue AS numericValue,
message.created AS created,
message.textValue AS textValue
FROM processed
LATERAL VIEW explode(data) AS message
WHERE row_num <= 3; -- Limit to 3 elements per message
Explanation:
Create ml_stream_raw (Unchanged):
This stream definition remains the same, defining the schema for incoming messages with an array of objects.
Create ml_stream_processed:
CTE (processed):
This CTE processes the raw data.
ROW_NUMBER() assigns a unique row number to each element in the data array.
The partitioning key is data[1].source to ensure ordering within each message’s array.
The ordering criteria is data[1].created (ascending) to maintain the original order.
LATERAL VIEW:
This clause iterates over each element in the data array, generating a separate row for each element as message .
WHERE Clause:
It filters the results to include only rows where row_num is less than or equal to 3, effectively limiting the output to a maximum of three elements per Kafka message.
3. Remaining Queries (Unchanged):
You can keep your original queries for creating the ml_table_messages_per_second_list and the final SELECT statement for message counting.
Key Improvements:
This approach avoids the need for an explicit key in the data, which wasn’t readily available.
It preserves the original data within each element of the stream.
It ensures a maximum of three elements per message, adhering to the requirement.
With these modifications, ksqlDB will now create a stream containing at most three elements for each Kafka message, based on the order of the created timestamp within each message’s array.