# Aggregations

Aggregations are stateful transformations that allow to group an unbounded set of inputs into sub-sets and then to aggregate each of these sub-sets into a single output; the reason why they are stateful is because they need to maintain the current state of computation between the application of each input.

To group a given input dataset into sub-sets, a key function needs to be specified; the result of applying this key function to an input record will be used as a discriminator (sometimes called a pivot) to determine in what sub-set each input record is to be bucketed.

The specific transformation that each aggregation performs is described by the Aggregated Functions used in the input query. See a complete list of aggregate functions.

### Aggregations match table semantics  <a href="#aggregations-match-table-semantics" id="aggregations-match-table-semantics"></a>

Notice that the behavior described above is precisely what a Table does. The state will continuously be updated for any given key as new events with the given key are received. In the case of Aggregations, new events are represented by input records in the original dataset that will map to a given key, therefore ending up in a bucket or another.

Whenever Aggregations are used, the result will be a Table. Each entry will have the key set to the grouping discriminator and the value set to the current computation state for all input records matching the key.

### Syntax  <a href="#syntax" id="syntax"></a>

The complete syntax for aggregations is as follows:

```sql
SELECT (STREAM | TABLE)
  <aggregated projection1>
    [, aggregated projection2] ... [, aggregated projectionN]
    [, projection1] ... [, projectionM]
FROM
  <source>
[WINDOW BY <window description>]
GROUP BY <expression>
;
```

The specific syntactical elements of the above are:

* `(STREAM | TABLE)`: specifies if the `<source>` is to be interpreted as a *stream* or a *table*.
* `<aggregated projection1>`: a projection is aggregated when its source contains an Aggregated Function (e.g. `COUNT(*) as x`, `CAST(COUNT(*) as STRING) as stringed`).
* `[, aggregated projection2] ... [, aggregated projectionN]`: a query can contain any additional aggregated projections after the first mandatory one.
* `[, projection1] ... [, projectionM]`: a query can contain any number of common, non-aggregated projections. Streaming only supports full GROUP BY mode. This means that fields that are not part of the `GROUP BY` clause cannot be referenced by non-aggregated projections.
* `<source>`: a normal source, like a topic or the result of a `WITH` statement.
* `[WINDOW BY <window description>]`: this optional section can only be specified if `STREAM` is used. It allows us to describe the *windowing* applied to the aggregation. More on this below.
* `GROUP BY <expression>`: the result of evaluating `<expression>` will be used to divide the input values into different groups. These groups will be the input for each aggregated projection specified. The `<expression>`’s result will become the key for the table resulting from the query.

### Specific rules for aggregated projections  <a href="#specific-rules-for-aggregated-projections" id="specific-rules-for-aggregated-projections"></a>

Most of the rules and syntax described for Projections also apply to aggregated projections, but there are some additional syntactical rules due to the specific nature of Aggregations.

* Aliasing rules mostly work the same, but **it is not possible** to project on the key facet; `COUNT(*) as _key.a` or `SUM(x) as _key` are therefore not allowed.
* At least one aggregated projection must be specified in the query.
* Projections using an unqualified key facet as the source are not allowed. `_key.a` or `COUNT(_key.b)` are forbidden because `_key` is unqualified, but `<source>._key.a` and `COUNT(<source>._key.b)` are supported.

### Grouping and storage format for key  <a href="#grouping-and-storage-format-for-key" id="grouping-and-storage-format-for-key"></a>

```sql
INSERT INTO target-topic
SELECT STREAM  
    COUNT(*) AS records
FROM input-topic
GROUP BY field1;
```

As previously mentioned, the `GROUP BY` is used to determine the key of the query’s result; the above query will group all records in `input-topic` by the value of `field1` in each record, and `target-topic`’s key will be the schema of `field1`.

Like in the Projections case, the Streaming mode takes an opinionated approach here and will simplify the result schema and Storage Format in the case of single field structures.

In the case above, assuming, for example that `field1` is an integer, `target-topic`’s key **will not** be a structure with a single integer `field1` field, but rather just the value `field1`; the resulting storage format is going to be `INT`, and the label `field1` will be just dropped.

In case the above behavior is not desirable, specifying an explicit alias will allow to override it.

```sql
INSERT INTO target-topic
SELECT STREAM  
    COUNT(*) AS records,
FROM input-topic
GROUP BY field1 AS keep_me;
```

This will result in `target-topic`’s key is a structure with a field `keep_me`, with the same schema as `field1`. The corresponding Storage Format will match the input format for `input-topic`, `AVRO` or `JSON`.

### Semantics  <a href="#semantics" id="semantics"></a>

An example will help clarify how aggregations work and how they behave depending on the semantics of the input dataset they are being applied to.

#### Example scenario  <a href="#example-scenario" id="example-scenario"></a>

Assume that we have a Kafka topic (`gaming-sessions`) containing these records:

| Offset | Key   | Value                          |
| ------ | ----- | ------------------------------ |
| 0      | billy | {points: 50, country: “uk”}    |
| 1      | billy | {points: 90, country: “uk”}    |
| 2      | willy | {points: 70, country: “uk”}    |
| 3      | noel  | {points: 82, country: “uk”}    |
| 4      | john  | {points: 50, country: “usa”}   |
| 5      | dave  | {points: 30, country: “usa”}   |
| 6      | billy | {points: 90, country: “spain”} |

This data describes a series of gaming sessions performed by a player. For each gaming session, the player (used as Key), the points achieved, and the country where the game occurred.

### Aggregating a Stream  <a href="#aggregating-a-stream" id="aggregating-a-stream"></a>

Let’s assume that we want to calculate the total points achieved by players in a given country and the average points per game.\
One way to achieve the desired behavior is to build a Stream from the input topic. Remember that this means that each event will be considered in isolation.

```sql
INSERT INTO target-topic
SELECT STREAM
    SUM(points) AS total_points
    , AVG(points) AS average_points
FROM gaming-sessions
GROUP BY country
```

Explanations for each element of this syntax can be found below, but very briefly, this builds a Stream from `gaming-sessions`, grouping all events by `country` (e.g., all records with the same `country` will be aggregated together) and finally calculating the total (`total_points`) and the average (`average_points`) of all points for a given group.

The final result in `target-topic` will be (disregarding intermediate events):

| Offset | Key   | Value                                     |
| ------ | ----- | ----------------------------------------- |
| 3      | uk    | {total\_points: 292, average\_points: 73} |
| 5      | usa   | {total\_points: 80, average\_points: 40}  |
| 6      | spain | {total\_points: 90, average\_points: 90}  |

The results are calculated from the totality of the input results because, in a Stream, each event is independent and unrelated to any other.

### Aggregating a Table  <a href="#aggregating-a-table" id="aggregating-a-table"></a>

We now want to calculate something similar to what we obtained before, but we want to keep track only of the last session played by a player, as it might give us a better snapshot of the performances and locations of players worldwide. The statistics we want to gather are the same: total and average points per country.

The way to achieve the above requirement is simply by reading *gaming sessions* into a Table rather than a Stream and aggregating it.

```sql
INSERT INTO target-topic
SELECT TABLE
    SUM(points) AS total_points
    , AVG(points) AS average_points
FROM gaming-sessions
GROUP BY country
```

The final result in `target-topic` will be (disregarding intermediate events):

| Key   | Value                                     |
| ----- | ----------------------------------------- |
| uk    | {total\_points: 152, average\_points: 76} |
| usa   | {total\_points: 80, average\_points: 40}  |
| spain | {total\_points: 90, average\_points: 90}  |

Compare this with the behavior from the previous scenario; the key difference is that the value for `uk` includes only `willy` and `noel`, and that’s because the last event moved `billy` to the `spain` bucket, removing all data regarding him from his original group.

### **Aggregation functions for Tables**&#x20;

The previous section described the behavior of aggregations when applied to Tables and highlighted how aggregations not only need to be able to *sum* the latest values received to the current state of a group but also need to be able to subtract an obsolete value that might have just been assigned to a new group. As we saw above, it is easy to do this in the case of `SUM` and `AVG`.

However, consider what would happen if we wanted to add new statistics to the ones calculated above: the maximum points a player achieves in a given country.

In the Stream scenario, this can be achieved by simply adding `MAXK(points,1) as max_points` to the query.

```sql
INSERT INTO target-topic
SELECT STREAM
    SUM(points) AS total_points
    , AVG(points) AS average_points
    , MAXK(points,1) AS max_points
FROM gaming-sessions
GROUP BY country

```

| Offset | Key   | Value                                                      |
| ------ | ----- | ---------------------------------------------------------- |
| 3      | uk    | {total\_points: 292, average\_points: 73, max\_points: 90} |
| 5      | usa   | {total\_points: 80, average\_points: 40, max\_points: 50}  |
| 6      | spain | {total\_points: 90, average\_points: 90, max\_points: 90}  |

In the Table scenario, however, things are different. We know that the final event *moves* `billy` from `uk` to `spain`, so we need to *subtract* from `uk` all information related to `billy`. In case of `SUM` and `AVG` that’s possible because subtracting `billy`’s points to the current value of the aggregation will return the correct result.

But that’s not possible for `MAXK`. `MAXK(points, 1)` only keeps track of `1` value, the highest seen so far, and if that’s removed, what value should take its place? The aggregation function cannot inspect the entire topic data to find the correct answer. The state the aggregation function has access to is that single number `90`, which is now invalid.

This problem explains why **some aggregated functions can be used on Streams and Tables both (e.g. `SUM`), while others can be used only on Streams (e.g. `MAXK`)**.

The key factor is usually whether a hypothetical subtraction operation would need access to all previous inputs to calculate its new value (like `MAXK`) or just the aggregated state (like `SUM`).

### Windowed aggregations  <a href="#windowed-aggregations" id="windowed-aggregations"></a>

A common scenario in the context of aggregations is adding a time dimension to the grouping logic expressed in the query. For example, one might want to group all input records by a given field that were received within 1 hour of each other.

SQL Streaming supports windowed aggregations by adding a clause to the query to express the above Lenses. Given their semantics, tables cannot be aggregated using a window because it would not make sense. A table represents the latest\_ state of a set of (Key, Value) pairs, not a series of events interspersed over a time continuum. Thus trying to window them is not a sensible operation.

Details of the supported types of windows, their specific syntax, and the fundamental relationship between stream processing and time can be found on this documentation's Time and Windows page.

### Filtering aggregated queries  <a href="#filtering-aggregated-queries" id="filtering-aggregated-queries"></a>

Filtering the input into aggregated queries is similar to filtering non-aggregated ones. When using a `WHERE <expression>` statement, where *`<expression>`* is a valid SQL boolean expression, all records that do not match the predicate will be left out.

However, aggregated functions add a further dimension to what it might be desirable to filter.

We might be interested in filtering based on some groups' conditions; for example, we might want to count all input records with a given value of `field1`, *but only if the total is greater than 3*. In this case, `WHERE` it would not help because it has neither access to the groups nor to the results of the *aggregated projections*. The below query is what is needed.

```sql
INSERT INTO target-topic
SELECT STREAM
    COUNT(*) as sessions
FROM gaming-sessions
GROUP BY country
HAVING sessions > 3;
```

The above query uses the `HAVING` clause to express a filter at a grouping level. Using this feature, it is possible to express a predicate on the result of aggregated projections and filter out the output records that do not satisfy it.

Note that only aggregated projections specified in the `SELECT` the clause can be used within the `HAVING` clause.

<br>
