IBM Event Streams and IBM Event Automation

IBM Event Streams and IBM Event Automation

Join this online group to communicate across IBM product users and experts by sharing advice and best practices with peers and staying up to date regarding product enhancements.

 View Only

Processing click tracking events using Flink SQL in Event Processing

By Dale Lane posted 28 days ago

  

In this post, I introduce a few core Flink SQL functions using worked examples of processing a stream of click tracking events from a retail website.

I find that a practical, real-world (ish) example can help to explain how to use Flink SQL in a way that abstract descriptions, such as processing coloured blocks sometimes doesn't quite achieve.

I'll use this post to give examples of my most-used Flink SQL functions, in the context of a retail scenario: a stream of events from customers on the website for a clothing retailer.

0 Consuming click tracking events

Demonstrating how to consume Avro events and connect to a schema registry to fetch schemas as needed.

The Event Processing project for this example is just a single event source node.

I want Flink to dynamically fetch schemas on-demand for the events that it consumes, so I provided connection details for a Confluent-compatible schema registry API.

I still need to define the table for Flink, which Event Processing generated by fetching the current version of the schema for the topic and converting the Avro schema (see below) into the equivalent SQL table definition.

I've added a few comments to the SQL generated for this example for readability.

-- ----------------------------------------------------------------------------
-- Event source : "click stream events"
--
-- define the Kafka clicktracking topic as a source of events to process
-- ----------------------------------------------------------------------------

CREATE TABLE `click stream events`
(
    -- SQL table representation of the Avro schema
    sessionid     STRING NOT NULL,
    eventid       STRING NOT NULL,
    type          STRING NOT NULL,
    context       ROW<
                        device ROW<
                            type       STRING NOT NULL,
                            os         STRING NOT NULL,
                            resolution STRING NOT NULL
                        > NOT NULL,
                        browser ROW<
                            name      STRING NOT NULL,
                            version   STRING NOT NULL,
                            useragent STRING NOT NULL,
                            enabled ROW<
                                cookies    BOOLEAN NOT NULL,
                                javascript BOOLEAN NOT NULL
                            > NOT NULL
                        > NOT NULL,
                        ipaddress  STRING NOT NULL,
                        donottrack BOOLEAN NOT NULL
                    > NOT NULL,
    referrer      STRING,
    customer      ROW<
                        id     STRING NOT NULL,
                        name   STRING NOT NULL,
                        emails ARRAY<STRING NOT NULL> NOT NULL
                    >,
    url           STRING NOT NULL,
    product       STRING,
    `timestamp`   STRING NOT NULL,

    -- timestamp taken from the Kafka events
    event_time    TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
    WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND
)
WITH (
    'connector' = 'kafka',
    'topic' = 'CLICKTRACKING.REG',

    'properties.group.id' = 'flink-0-setup',
    'scan.startup.mode' = 'earliest-offset',

    'properties.sasl.mechanism' = 'SCRAM-SHA-512',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.bootstrap.servers' = 'my-kafka-cluster-kafka-bootstrap.event-automation.svc:9095',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-demo-apps" password="OgmLKWuu86BWKQCtvSFl7h3oDxkc8FOD";',
    'properties.ssl.endpoint.identification.algorithm' = '',

    'value.format' = 'avro-confluent',
    'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
    'value.avro-confluent.basic-auth.user-info' = 'es-schemas:es-schemas',
    'value.avro-confluent.url' = 'https://my-kafka-cluster-ibm-es-ac-reg-external-event-automation.apps.dale-lane.cp.fyre.ibm.com/apis/ccompat/v7'
);

0-setup.sql

(If you use this, you'll need to replace the password but otherwise that SQL will work as-is if you've used the same setup as me.)

Running this SQL isn't super exciting, but this is getting us started.

1 Identifying marketing campaign effectiveness

Demonstrating how to transform events by deriving marketing campaign properties from query parameters in click event URLs.

The Event Processing project for this builds on the previous example by adding a Transform node.

Click events contain the URL that was the origin of the click event. This example demonstrates how to use some of Flink's built-in functions by chopping up that URL to extract marketing campaign properties.

-- ----------------------------------------------------------------------------
-- Transform : "marketing campaign details"
--
-- extract digital marketing campaign details from the URL event property
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `extract marketing campaign details` AS
    SELECT
        -- timestamp for the clickstream event
        event_time,

        -- properties of the click event
        sessionid, eventid,
        type,
        context,
        referrer,
        customer,
        product,

        -- URL that is the source of the click event
        --  which can include tracking for digital marketing campaign in
        --  query parameters
        url,
        -- true/false - if digital marketing properties are present
        REGEXP( PARSE_URL(url, 'QUERY'), '(^|&)utm_' ) AS digital_marketing,
        -- extract individual digital marketing campaign properties
        PARSE_URL( url, 'QUERY', 'utm_source' )        AS utm_source,
        PARSE_URL( url, 'QUERY', 'utm_medium' )        AS utm_medium,
        PARSE_URL( url, 'QUERY', 'utm_campaign' )      AS utm_campaign,
        PARSE_URL( url, 'QUERY', 'utm_content' )       AS utm_content,
        PARSE_URL( url, 'QUERY', 'utm_term' )          AS utm_term

    FROM
        `click stream events`;

1-transform.sql

The interesting bits are lines 91-96 which are a nice example of how Flink has functions to solve a wide variety of data parsing and processing use cases.

The screenshot above from running the flow in Event Processing shows a comparison between the raw URL from the click tracking event with the different utm_ parameters that it has extracted.

(Many of them are null because not all URLs include marketing campaign properties.)

Adding an additional filter for digital_marketing IS TRUE lets me show only the click tracking events that contained a URL with marketing campaign properties.

2 Click tracking activity by new customers

Demonstrating an interval join to correlate between related streams of events, by identifying click tracking events that occur within a short time of a new customer registration event for the same user.

The Event Processing project contains two event source nodes (one for a click tracking topic, the second for a customer registrations topic) and an Interval Join node to correlate events between them.

The assistant was helpful here to configure the interval join. I needed to decide on the time window to use (how soon after a new customer registers I'm interested in seeing their click tracking events).

The assistant also helped me choose an appropriate join type. In this case, I wanted an inner join - I want to see only click tracking events where a customer registration event was observed within the time window.

Finally, the assistant helped me define the format for the output I wanted - choosing (and renaming) the properties from the two different input streams to keep.

As before, I've added comments to the generated SQL to make it more readable.

-- ----------------------------------------------------------------------------
-- Interval join : "click streams by new users"
--
-- correlate click events with customer signup events for the same customer id
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `click streams by new users` AS
    SELECT
        -- timestamps for the two streams of events
        `new customer registrations`.event_time AS registration_time,
        `click stream events`.event_time        AS click_time,
        -- customer details
        CAST( ROW(`click stream events`.customer.id) AS ROW<id STRING> ) AS customer,
        `new customer registrations`.customername AS customername,
        -- click event type
        `click stream events`.type  AS type,
        -- product info associated with the event
        `click stream events`.product AS product
    FROM
        `new customer registrations`

    JOIN
        `click stream events`
        ON
            `click stream events`.customer.id = `new customer registrations`.customerid
        AND
            `click stream events`.event_time
                BETWEEN
                    `new customer registrations`.event_time
                        AND
                    `new customer registrations`.event_time + INTERVAL '1' HOUR;

2-interval-join.sql

The interesting bits are lines 122-134 which are an example of how to use an interval join to correlate between two streams of events.

The screenshot above from running the flow in Event Processing shows just click tracking events for customers within the first hour after they create their new account.

3 Browser and device usage

Demonstrating how to aggregate events within a tumble window to determine the types of device (desktop, tablet, mobile) used each hour.

Demonstrating how to use a Top-N query to identify the most-used web browser in each hour.

The Event Processing project for this example contains two different tumble window aggregate nodes - one to count device types per hour, the second to count browser names per hour. The second aggregate also has a Top-N node to identify the most used browser name in each hour.

All click tracking events contain a sessionid. This can be used to correlate the multiple different events from a single user as being part of the same overall user session.

The first aggregate is a tumble window to count the number of different session ids that occur in each hour, grouped by the type of device (i.e. desktop, mobile, tablet).

When the flow is running, it will output three events at the end of each hour - one for each device type (desktop, mobile, tablet) with a count of how many unique sessions have been observed with that device type during the hour.

-- ----------------------------------------------------------------------------
-- Aggregate : "device type usage"
--
-- count the number of sessions for each type of device (e.g. mobile / desktop / tablet)
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `device type usage` AS
    SELECT
        COUNT(DISTINCT sessionid) AS `number of sessions`,
        context.device.type       AS `device type`,
        window_start              AS `start`,
        window_end                AS `end`,
        window_time               AS `result`

    FROM
        TABLE (
            TUMBLE(
                TABLE `click stream events`,
                DESCRIPTOR(event_time),
                INTERVAL '1' HOUR
            )
        )

    GROUP BY
        window_start,
        window_end,
        window_time,
        context.device.type;

3-aggregate-tumble.sql

The first aggregate (number of sessions for each device type) is in lines 71-92. I've added some comments to explain what it is doing.

The second aggregate is another tumble window to count the number of different session ids that occur in each hour, grouped by the browser name (e.g. Chrome, Firefox, Safari, etc.).

The Top-N query is added to the second aggregate, and configured so that the three browser names with the highest number of sessions are emitted at the end of each hour window.

When the flow is running, it will output three events at the end of each hour - one for each of the three most-used browsers, with a count of how many unique session shave been observed with that browser during the hour.

-- ----------------------------------------------------------------------------
-- Aggregate : "browser usage"
--
-- count the number of sessions for each type of web browser (e.g. Chrome / Safari / etc.)
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `browser usage` AS
    SELECT
        COUNT(DISTINCT sessionid) AS `number of sessions`,
        context.browser.name      AS browser,
        window_start              AS `start`,
        window_end                AS `end`,
        window_time               AS `result`

    FROM
        TABLE (
            TUMBLE(
                TABLE `click stream events`,
                DESCRIPTOR(event_time),
                INTERVAL '1' HOUR
            )
        )

    GROUP BY
        window_start,
        window_end,
        window_time,
        context.browser.name;



-- ----------------------------------------------------------------------------
-- Top-N : "most used browsers"
--
-- at the end of each hour, emits the 3 browsers with the highest number of sessions
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `most used browsers` AS
    SELECT
        `start`,
        `rank`,
        browser,
        `number of sessions`

    FROM
    (
        SELECT
            `start`,
            ROW_NUMBER() OVER (
                PARTITION BY
                    `start`, `end`
                ORDER BY
                    `number of sessions` DESC
            ) AS `rank`,
            browser,
            `number of sessions`

        FROM
            `browser usage`
    )

    WHERE
        `rank` <= 3;

3-aggregate-tumble.sql

The second aggregate (most used browser for each hour) is in lines 102-158. I've added some comments to explain what it is doing.

4 User session duration

Demonstrating how to aggregate events using a session window to identify click tracking events from the same user as part of the same user session.

Demonstrating how to aggregate those user sessions using a tumble window to identify the attributes of an average user session.

The Event Processing project for this example uses two aggregate nodes.

The first aggregate node collects individual click events into complete user sessions - all of the clicks that a user performed as part of a single active session.

The SQL for this first aggregate is at 4-aggregate-session.sql (lines 72-100) with comments added to explain what it is doing.

-- ----------------------------------------------------------------------------
-- Aggregate : "sessions"
--
-- groups individual click stream events that are part of the same user session
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW sessions AS
    SELECT
        -- correlation id for all click events that are part of the same user session
        sessionid,
        -- how long the session lasted in seconds
        TIMESTAMPDIFF (
            SECOND,
            window_start,
            window_end - INTERVAL '15' MINUTES
        )                                  AS duration_secs,
        -- when the session occurred
        window_time                        AS sessions_time

    FROM
        TABLE (
            SESSION (
                TABLE `click stream events`
                PARTITION BY sessionid,
                DESCRIPTOR(event_time),
                INTERVAL '15' MINUTES
            )
        )

    GROUP BY
        sessionid,
        window_start,
        window_end,
        window_time;

I'm calculating the duration of each user session by using a TIMESTAMPDIFF function to compute the difference between the timestamps for the first and last click tracking event observed with each sessionid.

The result from this is that every time a session discontinues (which I've defined as "no events with that session id being received for 15 minutes") an event is emitted with the duration of that session.

The second aggregate node takes those individual session durations, and counts the number of sessions, identifies the longest and shortest session that was recorded during each hour, and calculates the average session duration.

The SQL for this second aggregate is at 4-aggregate-session.sql (lines 109-133) with comments addded to explain what it is doing.

-- ----------------------------------------------------------------------------
-- Aggregate : "average sessions"
--
-- computes characteristics of average user sessions
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `average sessions` AS
    SELECT
        -- average session properties
        COUNT(sessionid)   AS `number of sessions per hour`,
        MIN(duration_secs) AS `shortest session (secs)`,
        MAX(duration_secs) AS `longest session (secs)`,
        AVG(duration_secs) AS `average session (secs)`,
        -- time window
        window_start AS aggregateStartTime,
        window_end   AS aggregateEndTime,
        window_time  AS aggregateResultTime

    FROM
        TABLE (
            TUMBLE(
                TABLE sessions,
                DESCRIPTOR(sessions_time),
                INTERVAL '1' HOUR
            )
        )

    GROUP BY
        window_start,
        window_end,
        window_time;

The result from this is that at the end of every hour, an event is emitted with the number of user sessions observed in the previous hour of click tracking events, the shortest and longest user session, and the average user session duration.

5 Abandoned baskets

Demonstrating how to filter sessions that match certain criteria to identify user sessions that included adding products to a shopping cart but that did not result in a purchase.

The Event Processing project for this example collects click tracking events with the same sessionid into user sessions, and identifies attributes of that session - such as the number of products added to the shopping cart during the session, and whether or not checkout events were included within the session. It then adds a filter node so that only sessions with an abandoned basket are kept.

The SQL to define the session in this example is at 5-aggregate-session.sql in lines 72-125.

-- ----------------------------------------------------------------------------
-- Aggregate : "sessions"
--
-- groups individual click stream events that are part of the same user session
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW sessions AS
    SELECT
        -- correlation id for all click events that are part of the same user session
        sessionid,

        -- how long the session lasted in seconds
        TIMESTAMPDIFF(
            SECOND,
            window_start,
            window_end - INTERVAL '15' MINUTES
        )                                                                AS duration_secs,

        -- list of products added to the cart during the session
        --  defined as the array of products added to the cart during the session
        --   minus the products removed from the cart
        ARRAY_EXCEPT(
            -- products added to the cart
            ARRAY_AGG(CASE WHEN type = 'ADD_TO_CART'
                            THEN product
                            ELSE NULL
                            END IGNORE NULLS),
            -- products removed from the cart
            ARRAY_AGG(CASE WHEN type = 'REMOVE_FROM_CART'
                            THEN product
                            ELSE NULL
                            END IGNORE NULLS))                           AS products,

        -- TRUE if the session included a checkout-complete event, FALSE otherwise
        MAX(CASE WHEN type = 'CHECKOUT_COMPLETE' THEN 1 ELSE 0 END) = 1  AS checked_out,

        -- ID for the customer - may not be contained in all click events
        --  contained in the session if they only logged in during the session
        --  but it must be present in click events by the time a checkout is completed
        ARRAY_AGG(DISTINCT customer.id IGNORE NULLS)[1]                  AS customer_id,

        -- when the session occurred
        window_time                                                      AS sessions_time

    FROM
        TABLE (
            SESSION (
                TABLE `click stream events`
                PARTITION BY sessionid,
                DESCRIPTOR(event_time),
                INTERVAL '15' MINUTES
            )
        )

    GROUP BY
        sessionid,
        window_start,
        window_end,
        window_time;

5-aggregate-session.sql

The session window let me collect together all individual click tracking events that have the same sessionid value - that are all part of the same overall user session. This aggregate SQL demonstrates a few of the functions that can be applied to the events in each session:

  • TIMESTAMPDIFF - to compute the duration of the session
  • ARRAY_AGG - to collect a list of the products added to the shopping basket across multiple separate events within the session
  • ARRAY_EXCEPT - to remove products that were removed from the shopping basket from that collected list
  • MAX and CASE - to identify whether a checkout-complete event was contained in the session

The result from this is that every time a session discontinues (which I've defined as no events with that session id being received for 15 minutes) an event is emitted with details from that user session, such as how long the session lasted, the customer id, the contents of their shopping basket at the end of the session, and whether or not they made a purchase.

These session properties can be used in a filter to just keep events from the end of a session that resulted in an abandoned basket:

  • duration of at least 60 seconds - to ignore users who clicked away very quickly
  • at least one product in their shopping basket
  • no checkout complete event
  • logged in user

The SQL to do this is at 5-aggregate-session.sql in lines 143-161.

-- ----------------------------------------------------------------------------
-- Filter : "abandoned baskets"
--
-- identifies user sessions that match the criteria for an abandoned shopping cart
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `abandoned baskets` AS
    SELECT
        sessions_time,
        sessions.customer_id AS customer_id,
        sessions.products    AS products

    FROM
        sessions

    WHERE
        -- ignore sessions that are less than a minute long
        duration_secs > 60

            AND

        -- sessions with at least one item in the basket when the
        --  last click event was emitted
        CARDINALITY(products) > 0

            AND

        -- sessions that did not include a checkout-complete event
        checked_out IS FALSE

            AND

        -- sessions where the user was logged in
        customer_id IS NOT NULL;

5-aggregate-session.sql

The result from this is that when a logged in user abandons their session (which I've defined as no click events observed for 15 minutes), an event is emitted with their customer id and the contents of their shopping basket.

This could be used to trigger some automated promotional activity for that customer.

6 Buying behaviour

Demonstrating how to recognize sequences of events that match a defined pattern to identify different shopper behaviours in a stream of click tracking events.

The Event Processing project for this example contains two different pattern detection nodes:

  • looking for a sequence of click tracking events that indicates a customer making a purchase following searching for specific items
  • looking for a sequence of click tracking events that indicates a customer making a purchase following browsing lists of products within a category

These are then combined into a single stream of purchase events, enriched with a label that describes the type of behaviour suggested by the click tracking events that led to the purchase.

The SQL to recognise searching behaviour is at 6-match-recognize.sql in lines 96-128.

The simplified sequence I've defined is a search, followed by adding something to the shopping basket, followed by completing a purchase.

-- ----------------------------------------------------------------------------
-- Match recognize : "search driven purchases"
--
-- identifies user sessions that result in a purchase where products were
--  added to the cart following searches
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `search driven purchases` AS
    SELECT
        sessionid,

        -- identifies pattern when combined in the following union
        CAST('search purchase' AS STRING) AS buy_pattern,

        -- time that the purchase was completed
        checkout_complete_time,

        -- useful for debugging - the complete set of click events emitted for the session
        --  (starting from the first search)
        ARRAY_CONCAT(
            search_events,
            other_events,
            ARRAY['ADD_TO_CART'],
            following_events,
            ARRAY['CHECKOUT_COMPLETE']
        ) AS all_events

    FROM
        `click stream events`

    MATCH_RECOGNIZE (
        PARTITION BY sessionid
        ORDER BY event_time
        MEASURES
            CHECKOUT.event_time       AS checkout_complete_time,
            ARRAY_AGG(SEARCHING.type) AS search_events,
            ARRAY_AGG(OTHER.type)     AS other_events,
            ARRAY_AGG(ANYTHING.type)  AS following_events

        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW

        PATTERN (
            -- user performs a search
            SEARCHING+?
                -- optionally other actions
                OTHER*
            -- adds an item to their basket
            ADD
                -- optionally other actions,
                --  including additional searches and adds
                ANYTHING*
            -- completes a purchase
            CHECKOUT
        )
        DEFINE
            SEARCHING AS SEARCHING.type = 'SEARCH',
            ADD       AS ADD.type       = 'ADD_TO_CART',
            CHECKOUT  AS CHECKOUT.type  = 'CHECKOUT_COMPLETE',
            OTHER     AS OTHER.type     IN ('PAGE_VIEW','PRODUCT_VIEW','REMOVE_FROM_CART','CART_VIEW','CHECKOUT_START','LOGIN'),
            ANYTHING  AS ANYTHING.type  IN ('SEARCH','ADD_TO_CART','PAGE_VIEW','PRODUCT_VIEW','REMOVE_FROM_CART','CART_VIEW','CHECKOUT_START','LOGIN')

    ) AS search_purchases;

The SQL to recognize browsing behaviour is at 6-match-recognize.sql in lines 162-194.

The simplified sequence I've defined is a browse, followed by adding something to the shopping basket, followed by completing a purchase.

-- ----------------------------------------------------------------------------
-- Match recognize : "browsing driven purchases"
--
-- identifies user sessions that result in a purchase where products were
--  added to the cart following browsing product category pages
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW `browsing driven purchases` AS
    SELECT
        sessionid,

        -- identifies pattern when combined in the following union
        CAST('browsing purchase' AS STRING) AS buy_pattern,

        -- time that the purchase was completed
        checkout_complete_time,

        -- useful for debugging - the complete set of click events emitted for the session
        --  (starting from the first browse)
        ARRAY_CONCAT(
            browse_events,
            other_events,
            ARRAY['ADD_TO_CART'],
            following_events,
            ARRAY['CHECKOUT_COMPLETE']
        ) AS all_events

    FROM
        `click stream events`

    MATCH_RECOGNIZE (
        PARTITION BY sessionid
        ORDER BY event_time
        MEASURES
            CHECKOUT.event_time      AS checkout_complete_time,
            ARRAY_AGG(BROWSE.type)   AS browse_events,
            ARRAY_AGG(OTHER.type)    AS other_events,
            ARRAY_AGG(ANYTHING.type) AS following_events

        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW

        PATTERN (
            -- user is browsing
            BROWSE+?
                -- optionally other actions
                OTHER*
            -- adds an item to their basket
            ADD
                -- optionally other actions,
                --  including additional browses and adds
                ANYTHING*
            -- completes a purchase
            CHECKOUT
        )
        DEFINE
            BROWSE    AS BROWSE.type    = 'PAGE_VIEW',
            ADD       AS ADD.type       = 'ADD_TO_CART',
            CHECKOUT  AS CHECKOUT.type  = 'CHECKOUT_COMPLETE',
            OTHER     AS OTHER.type     IN ('PRODUCT_VIEW','REMOVE_FROM_CART','CART_VIEW','CHECKOUT_START','LOGIN'),
            ANYTHING  AS ANYTHING.type  IN ('ADD_TO_CART','PAGE_VIEW','PRODUCT_VIEW','REMOVE_FROM_CART','CART_VIEW','CHECKOUT_START','LOGIN')

    ) AS browse_purchases;

More complex and advanced patterns can be implemented - these are simple examples to illustrate what is possible.

-- ----------------------------------------------------------------------------
-- Union : "purchases"
--
-- combined set of purchases where a pattern of behaviour was recognized
-- ----------------------------------------------------------------------------

CREATE TEMPORARY VIEW purchases AS
    SELECT * FROM `search driven purchases`
        UNION ALL
    SELECT * FROM `browsing driven purchases`;

6-match-recognize.sql

The results from each of these are then combined together using a UNION on line 206.

The result is a stream of purchase events, enriched with a label that describes the type of behaviour suggested by the click tracking events that led to the purchase.

The data

I'm processing Avro-encoded events in these examples, so looking at the raw data as in this screenshot isn't super helpful.

If you're familiar with Avro, you can look at the Avro schema for the click tracking events, which includes a description of each of the fields.

{
    "namespace": "com.loosehangerjeans",
    "type": "record",
    "name": "ClickEvent",
    "doc": "Clickstream events for user interactions with the Loosehanger website",
    "fields": [
        {
            "name": "sessionid",
            "type": {
                "type": "string",
                "pattern": "^sess_[a-z0-9]{16}$"
            },
            "doc": "ID for the session that this event is a part of. This is a useful ID for correlating individual clicktracking events - all click events that are part of the same user session will have the same session id."
        },
        {
            "name": "eventid",
            "type": {
                "type": "string",
                "pattern": "^[a-z0-9]{12}$"
            },
            "doc": "Unique event identifier for this individual click tracking event."
        },
        {
            "name": "type",
            "type": "string",
            "doc": "Action that the user has taken. Valid values: PAGE_VIEW, SEARCH, PRODUCT_VIEW, ADD_TO_CART, REMOVE_FROM_CART, CART_VIEW, CHECKOUT_START, CHECKOUT_COMPLETE, LOGIN"
        },
        {
            "name": "context",
            "type": {
                "type": "record",
                "name": "UserContext",
                "doc": "Information about the device that the user is using to access the website",
                "fields": [
                    {
                        "name": "device",
                        "type": {
                            "type": "record",
                            "name": "Device",
                            "doc": "Device information",
                            "fields": [
                                {
                                    "name": "type",
                                    "type": "string",
                                    "doc": "Device class. Valid values: Desktop, Mobile, Tablet"
                                },
                                {
                                    "name": "os",
                                    "type": "string",
                                    "doc": "Operating system"
                                },
                                {
                                    "name": "resolution",
                                    "type": "string",
                                    "doc": "Screen resolution - in the form of width x height (e.g. 1920x1080)"
                                }
                            ]
                        },
                        "doc": "Information about the user's device"
                    },
                    {
                        "name": "browser",
                        "type": {
                            "type": "record",
                            "name": "Browser",
                            "doc": "Browser information",
                            "fields": [
                                {
                                    "name": "name",
                                    "type": "string",
                                    "doc": "Browser name (e.g. Chrome)"
                                },
                                {
                                    "name": "version",
                                    "type": "string",
                                    "doc": "Browser version (e.g. 121.0.0.0)"
                                },
                                {
                                    "name": "useragent",
                                    "type": "string",
                                    "doc": "Full user agent string"
                                },
                                {
                                    "name": "enabled",
                                    "type": {
                                        "type": "record",
                                        "name": "BrowserFeatures",
                                        "doc": "Flags indicating which browser features are enabled",
                                        "fields": [
                                            {
                                                "name": "cookies",
                                                "type": "boolean",
                                                "doc": "Whether cookies are enabled"
                                            },
                                            {
                                                "name": "javascript",
                                                "type": "boolean",
                                                "doc": "Whether JavaScript is enabled"
                                            }
                                        ]
                                    },
                                    "doc": "Browser features that are enabled"
                                }
                            ]
                        },
                        "doc": "Information about the user's browser"
                    },
                    {
                        "name": "ipaddress",
                        "type": "string",
                        "doc": "IP address the the user is connecting from"
                    },
                    {
                        "name": "donottrack",
                        "type": "boolean",
                        "doc": "Whether the user has Do Not Track enabled"
                    }
                ]
            },
            "doc": "Context information about the device being used"
        },
        {
            "name": "referrer",
            "type": [
                "null",
                "string"
            ],
            "default": null,
            "doc": "Referring URL. This will be null for requests referred by other pages on the Loosehanger website, or for direct accesses."
        },
        {
            "name": "customer",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "Customer",
                    "doc": "Customer information",
                    "fields": [
                        {
                            "name": "id",
                            "type": {
                                "type": "string",
                                "logicalType": "uuid"
                            },
                            "doc": "Unique customer identifier"
                        },
                        {
                            "name": "name",
                            "type": "string",
                            "doc": "Customer name"
                        },
                        {
                            "name": "emails",
                            "type": {
                                "type": "array",
                                "items": "string"
                            },
                            "doc": "Customer email addresses"
                        }
                    ]
                }
            ],
            "default": null,
            "doc": "Customer information. This is optional, and only present for logged-in users. This will be null for anonymous users."
        },
        {
            "name": "url",
            "type": "string",
            "doc": "Full URL of the page that was the source of the event"
        },
        {
            "name": "product",
            "type": [
                "null",
                "string"
            ],
            "default": null,
            "doc": "Product name. This will only be included for events related to shopping carts - adding/removing products from the user's basket."
        },
        {
            "name": "timestamp",
            "type": "string",
            "doc": "Event timestamp in ISO 8601 format with milliseconds"
        }
    ]
}

schema.avro

To make it easier to understand, I ran kafka-console-consumer on the topic using an Avro formatter with jq to prepare these more readable JSON representations.

kafka-console-consumer.sh \
  --bootstrap-server  my-kafka-cluster-bootstrap-event-automation.apps.dale-lane.cp.fyre.ibm.com:443 \
  --topic             CLICKTRACKING.REG \
  --consumer.config   es.properties \
  --formatter         com.ibm.eventautomation.kafka.formatters.ApicurioFormatter \
  --formatter-config  es-formatter.properties

This isn't an exhaustive list of what sorts of events are possible (look at the schema to understand that) but rather a few illustrative examples to understand the sort of events I've been processing.

Page views

Someone has viewed a page on the Loosehanger Jeans website.

The event includes information about what page they viewed, and what device they're using to view it. If the user is logged in, user details will be included. If they're not logged in, customer will be null. If they came to the Loosehanger Jeans website from somewhere else, the referrer URL will be included.

Pages can contain a list of products (e.g. products in a category, search results, etc.) or be static content (e.g. company information). Pages for an individual product are identified with a different PRODUCT_TYPE event.

{
  "sessionid": "sess_lhpiu0oc5m8uxhp8",
  "eventid": "v83x6ltco20s",
  "type": "PAGE_VIEW",
  "context": {
    "device": {
      "type": "Mobile",
      "os": "Android 13",
      "resolution": "412x892"
    },
    "browser": {
      "name": "Samsung Internet",
      "version": "123.0.0.0",
      "useragent": "Mozilla/5.0 (Linux; Android 13; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/23.0 Chrome/115.0.0.0 Mobile Safari/537.36",
      "enabled": {
        "cookies": true,
        "javascript": true
      }
    },
    "ipaddress": "21.133.10.63",
    "donottrack": false
  },
  "referrer": "https://linkedin.com/search",
  "customer": null,
  "url": "https://loosehangerjeans.com/homepage?gclid=CjwKCAjw9LTFH0SkX3Bd827cL_No&utm_campaign=2025_q1_fall_campaign&utm_medium=organic_search&utm_source=google&utm_content=variant_2",
  "product": null,
  "timestamp": "2025-12-19T11:12:56.423Z"
}

sample page view event 1-page-view.json

Search

Someone has searched for a product on the Loosehanger Jeans website.

If the user is logged in, user details will be included. If they're not logged in, customer will be null. The event includes information about the device that the user is using.

{
  "sessionid": "sess_qam0wdowklv0kjyv",
  "eventid": "dj9tashimuli",
  "type": "SEARCH",
  "context": {
    "device": {
      "type": "Tablet",
      "os": "iPadOS 18.0",
      "resolution": "2224x1668"
    },
    "browser": {
      "name": "Opera Mobile",
      "version": "121.0.0.0",
      "useragent": "Mozilla/5.0 (iPad; CPU OS 18_0 like Mac OS X) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 OPR/105.0.0.0",
      "enabled": {
        "cookies": true,
        "javascript": true
      }
    },
    "ipaddress": "174.68.190.120",
    "donottrack": false
  },
  "referrer": null,
  "customer": {
    "id": "a58cf0cf-c861-41bb-8bcf-f7811f5d6f3b",
    "name": "Jefferson Jerde",
    "emails": [
      "jefferson.jerde@example.com",
      "randall.mosciski@example.com"
    ]
  },
  "url": "https://loosehangerjeans.com/search?utm_term=offer&utm_campaign=2025_q2_black_friday&utm_medium=cpc&utm_source=google&utm_content=new_arrival",
  "product": null,
  "timestamp": "2025-12-19T11:10:37.691Z"
}

sample search event 2-search.json

Add to shopping cart

Someone has added a product to their shopping basket.

The event includes a description of the product the user has added. Users do not need to be logged in to do this, they can log in at the point they are ready to check out. The event includes information about the device that the user is using.

{
  "sessionid": "sess_7qcjpyefukhg1pks",
  "eventid": "pk1sncr8c0d0",
  "type": "ADD_TO_CART",
  "context": {
    "device": {
      "type": "Desktop",
      "os": "macOS 14.0",
      "resolution": "1680x1050"
    },
    "browser": {
      "name": "Safari",
      "version": "17.4",
      "useragent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15",
      "enabled": {
        "cookies": true,
        "javascript": true
      }
    },
    "ipaddress": "223.102.11.51",
    "donottrack": true
  },
  "referrer": "https://tiktok.com/home",
  "customer": null,
  "url": "https://loosehangerjeans.com/product/79c11c7b18c9?utm_campaign=oct_referral_program&utm_medium=email&utm_source=newsletter&utm_content=footer_link",
  "product": "M Denim Painter Jeans",
  "timestamp": "2025-12-19T11:13:53.407Z"
}

sample add to cart event 3-add-to-cart.json

Remove from shopping cart

Someone has removed a product from their shopping basket.

The event includes a description of the product the user removed. Users do not need to be logged in to do this, they can log in at the point they are ready to check out. The event includes information about the device that the user is using.

{
  "sessionid": "sess_ne7788tsl0baf6uw",
  "eventid": "kaqab311hqbd",
  "type": "REMOVE_FROM_CART",
  "context": {
    "device": {
      "type": "Tablet",
      "os": "iPadOS 18.0",
      "resolution": "1024x768"
    },
    "browser": {
      "name": "Samsung Internet",
      "version": "119.0.0.0",
      "useragent": "Mozilla/5.0 (iPad; CPU OS 18_0 like Mac OS X) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/23.0 Chrome/115.0.0.0 Mobile Safari/537.36",
      "enabled": {
        "cookies": true,
        "javascript": true
      }
    },
    "ipaddress": "101.124.210.4",
    "donottrack": false
  },
  "referrer": null,
  "customer": {
    "id": "39f289a2-0bee-4aac-ab75-75ce562ec68b",
    "name": "Tamie Sauer",
    "emails": [
      "tamie.sauer@example.com"
    ]
  },
  "url": "https://loosehangerjeans.com/cart?fbclid=IwYI3QsCDVYjriqlmNK6VHL3oHb&utm_campaign=may_cyber_monday&utm_medium=paid_social&utm_source=instagram&utm_content=clearance",
  "product": "L Twill Super-skinny Jeans",
  "timestamp": "2025-12-19T11:13:23.344Z"
}

sample remove from cart event 4-remove-from-cart.json

Login

A registered user has logged into the Loosehanger Jeans website.

The event includes details about the user who has logged in, and the device that the user is using.

{
  "sessionid": "sess_sw6yye9oxve9cyus",
  "eventid": "ezd0e7xknve0",
  "type": "LOGIN",
  "context": {
    "device": {
      "type": "Desktop",
      "os": "Windows 10",
      "resolution": "3840x2160"
    },
    "browser": {
      "name": "Chrome",
      "version": "123.0.0.0",
      "useragent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
      "enabled": {
        "cookies": true,
        "javascript": true
      }
    },
    "ipaddress": "111.80.106.143",
    "donottrack": false
  },
  "referrer": null,
  "customer": {
    "id": "6f32c3af-c7e4-4f12-879f-8c64d07021f4",
    "name": "Kurtis Morissette",
    "emails": [
      "kurtis.morissette@example.com"
    ]
  },
  "url": "https://loosehangerjeans.com/login",
  "product": null,
  "timestamp": "2025-12-19T11:13:27.362Z"
}

sample login event 5-login.json

Checkout

A registered user has completed a purchase on the Loosehanger Jeans website. There are a series of events that occur when someone makes a purchase (e.g. CART_VIEW, CHECKOUT_START) but CHECKOUT_COMPLETE is the interesting one that is emitted when a purchase is complete.

{
  "sessionid": "sess_ykvbvxr3om7ttz8r",
  "eventid": "ct3t26m0cyul",
  "type": "CHECKOUT_COMPLETE",
  "context": {
    "device": {
      "type": "Desktop",
      "os": "Windows 10",
      "resolution": "1440x900"
    },
    "browser": {
      "name": "Safari",
      "version": "17.3",
      "useragent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15",
      "enabled": {
        "cookies": true,
        "javascript": true
      }
    },
    "ipaddress": "135.16.76.23",
    "donottrack": false
  },
  "referrer": null,
  "customer": {
    "id": "96cf812a-99b9-4a81-83af-48072b036da2",
    "name": "Bernadette Legros",
    "emails": [
      "bernadette.legros@example.com"
    ]
  },
  "url": "https://loosehangerjeans.com/cart",
  "product": null,
  "timestamp": "2025-12-19T11:15:32.503Z"
}

sample checkout event 6-checkout.json

Others

There are other events, but those are enough to give you the idea of the sort of thing that we've got to play with.

Setup

My goal with this post was to inspire - when introducing teams to Flink, I find that it helps to have tangible concrete examples of what it can be used for, and how to turn ideas into Flink SQL.

If you'd like to try these examples for yourself, you can follow these setup instructions. In summary, I'm using the "Loosehanger Jeans" data generator, which is a configurable Kafka Connect source connector that generates sythentic events for demo and development projects.

I'm using it to produce Avro-encoded events using a Confluent-compatible schema registry.

key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
value.converter: io.apicurio.registry.utils.converter.AvroConverter
value.converter.schemas.enable: true

startup.history.enabled: false

topic.name.sensorreadings: SENSOR.READINGS.REG
topic.name.orders: ORDERS.NEW.REG
topic.name.cancellations: CANCELLATIONS.REG
topic.name.badgeins: DOOR.BADGEIN.REG
topic.name.stockmovements: STOCK.MOVEMENT.REG
topic.name.newcustomers: CUSTOMERS.NEW.REG
topic.name.onlineorders: ORDERS.ONLINE.REG
topic.name.clicktracking: CLICKTRACKING.REG
topic.name.outofstocks: STOCK.NOSTOCK.REG
topic.name.returnrequests: PRODUCT.RETURNS.REG
topic.name.productreviews: PRODUCT.REVIEWS.REG
topic.name.transactions: TRANSACTIONS.REG
topic.name.abandonedorders: ORDERS.ABANDONED.REG

timings.ms.badgeins: 600000
cancellations.ratio: 0.05

value.converter.apicurio.auth.username: kafka-connect-credentials
value.converter.apicurio.auth.password: ${file:/opt/kafka/connect-password/kafka-connect-credentials:password}

value.converter.apicurio.registry.request.ssl.truststore.location: /mnt/cluster-ca-cert/ca.p12
value.converter.apicurio.registry.request.ssl.truststore.type: PKCS12
value.converter.apicurio.registry.request.ssl.truststore.password: ${file:/mnt/cluster-ca-cert:ca.password}

value.converter.apicurio.registry.url: {{ schemaregistry_url }}
value.converter.apicurio.registry.auto-register: true
value.converter.apicurio.registry.as-confluent: true
value.converter.apicurio.registry.use-id: contentId
value.converter.apicurio.registry.headers.enabled: false
value.converter.apicurio.registry.dereference-schema: true

data generator config that I used

I used Avro as I wanted to demonstrate the way to integrate with a schema registry, but you could simplify this by generating JSON events and then changing the Flink connector format to json. In that way, you could even run all of it using OSS Kafka and Flink on your own laptop.

0 comments
9 views

Permalink