# Joins

Joins allow rows from different sources to be combined.

Lenses allow two data sources to be combined based either on the equality of their *\_key* facet or using a user-provided expression.

A query using joins looks like a regular query apart from the definition of its source and, in some cases, the need to specify a window expression:

```sql
SELECT (STREAM|TABLE)
  <projection>
FROM
  <sourceA> [LEFT|RIGHT|INNER|OUTER] JOIN
  <sourceB> [ON <joinExpression>]
  [WITHIN <duration>]
WHERE
  <filterExpression>;
```

* **projection**: the projection of a join expression differs very little from a regular projection. The only important consideration is that since data is selected from two sources, some fields may be common to both. Syntax `table.field` is recommended to avoid this type of problem.
* **sourceA**/**sourceB**: the two sources of data to combine.
* **window**: only used if two streams are joined. Specifies the interval of time to search for matching results.
* **joinExpression**: a boolean expression that specifies how the combination of the two sources is calculated.
* **filterExpression**: a filter expression specifying which records should be filtered.

### Join types  <a href="#join-types" id="join-types"></a>

When two sources of data are combined, it is possible to control which records to keep when a match is not found:

Disclaimer: The following examples do not consider windowing and/or table materialization concerns; these will be covered in the windowing section.

**Customers**

| \_key | id | name  |
| ----- | -- | ----- |
| 1     | 1  | John  |
| 2     | 2  | Frank |

**Orders**

| \_key | customer\_id | item     |
| ----- | ------------ | -------- |
| 1     | 1            | Computer |
| 2     | 1            | Mouse    |
| 3     | 3            | Keyboard |

### INNER JOIN / JOIN  <a href="#inner-join--join" id="inner-join--join"></a>

```sql
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT STREAM
    customersTable.name
   , ordersStream.item
 FROM
    ordersStream JOIN customersTable
        ON customersTable.id = ordersStream.customer_id;
```

This join type will only emit records where a match has occurred.

| name | item     |
| ---- | -------- |
| John | Computer |
| John | Mouse    |

(Notice there’s no item with `customer.id = 2` nor is there a customer with `id = 3` so these two rows are not present in the result).

### LEFT JOIN  <a href="#left-join" id="left-join"></a>

```sql
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT STREAM
    customersTable.name
    , ordersStream.item
 FROM
    ordersStream LEFT JOIN customersTable
        ON customersTable.id = ordersStream.customer_id;
```

This join type selects all the records from the left side of the join regardless of a match:

| name | item     |
| ---- | -------- |
| John | Computer |
| John | Mouse    |
| null | Keyboard |

(Notice all the rows from orders are present, but since no `customer.id = 3` no name can be set.)

### RIGHT JOIN  <a href="#right-join" id="right-join"></a>

```sql
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT TABLE
    customersTable.name
    , ordersStream.item
 FROM
    customersTable RIGHT JOIN ordersStream
        ON customersTable.id = ordersStream.customer_id;
```

A right join can be seen as a mirror of a LEFT JOIN. It selects all the records from the right side of the join regardless of a match:

| name | item     |
| ---- | -------- |
| John | Computer |
| John | Mouse    |
| null | Keyboard |

### OUTER JOIN  <a href="#outer-join" id="outer-join"></a>

```sql
WITH customersStream AS (SELECT STREAM * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT TABLE
    customersStream.name
    , ordersStream.item
 FROM
    ordersStream OUTER JOIN customersStream
        ON customersTable.id = ordersStream.customer_id;
```

An outer join can be seen as the union of left and right joins. It selects all records from the left and right side of the join regardless of a match happening:

| name  | item     |
| ----- | -------- |
| John  | Computer |
| John  | Mouse    |
| null  | Keyboard |
| Frank | null     |

### Matching expression (ON)  <a href="#matching-expression-on" id="matching-expression-on"></a>

If no ON expression is provided, the join will be evaluated based on the equality of the \_key facet by default. This means that the following queries are equivalent:

```sql
SELECT TABLE *
FROM customers JOIN orders;

SELECT TABLE *
FROM customers JOIN orders
    ON customers._key = orders._key;
```

However, when an expression is provided, there are limitations regarding what kind of expressions can be evaluated.

Currently, the following expression types are supported:

* Equality expressions using equality (=) with one table on each side:
  * `customers.id = order.user_id`
  * `customers.id - 1 = order.user_id - 1`
  * `substr(customers.name, 5) = order.item`
* Any boolean expression which references only one table:
  * `len(customers.name) > 10`
  * `substr(customer.name,1) = "J"`
  * `len(customer.name) > 10 OR customer_key > 1`
* Allowed expressions mixed together using an AND operator:
  * `customers._key = order.user_id AND len(customers.name) > 10`
  * `len(customers.name) > 10 AND substr(customer.name,1) = "J"`
  * `substr(customers.name, 5) = order.item AND len(customer.name) > 10 OR customer_key > 1`

Any expressions not following the rules above will be rejected:

* More than one table is referenced on each side of the equality operator
  * `concat(customer.name, item.name) = "John"`
  * `customer._key - order.customer_id = 0`
* a boolean expression not separated by an AND references more than one table:
  * `customer._key = 1 OR customer._key = order.customer_id`

### Windowing: stream-to-stream joins (WITHIN )  <a href="#windowing-stream-to-stream-joins-within-duration" id="windowing-stream-to-stream-joins-within-duration"></a>

When two streams are joined, Lenses needs to know how far away in the past and future to look for a matching record.

This approach is called a “Sliding Window” and works like this:

**Customers**

| arrival | id | name  |
| ------- | -- | ----- |
| t = 6s  | 1  | Frank |
| t = 20s | 2  | John  |

**Purchases**

| arrival | item     | customer\_id |
| ------- | -------- | ------------ |
| t = 10s | Computer | 1            |
| t = 11s | Keyboard | 2            |

```sql
SELECT STREAM
     customers.name
    , orders.item
FROM
    customers LEFT JOIN orders WITHIN 5s
        ON customers.id = orders.customer_id
WITHIN 5s;
```

At `t=10`, when both the Computer and the Keyboard records arrive, only one customer can be found within the given time window (the specified window is 5s; thus, the window will be \[10-5,10+5]s ).

This means that the following would be the result of running the query:

| name  | item     |
| ----- | -------- |
| Frank | Computer |
| null  | Keyboard |

Note: John will not match the Keyboard purchase since *t=20s* is not within the window interval \[10-5,10+5]s.

Read more about time windows here.

### Non-windowed joins (stream to table and table to stream)  <a href="#non-windowed-joins-stream-to-table-and-table-to-stream" id="non-windowed-joins-stream-to-table-and-table-to-stream"></a>

Records can be produced at different rates and even out of order when streaming data. Often, a match may not be found because a record hasn’t arrived yet.

The following example shows an example of a join between a stream and a table where the arrival of the purchase information is made available before the customers’ information is.

(Notice that the purchase of a “Keyboard”  `customer_id = 2` is produced before the record with the customer details is.)

**Customers**

| arrival | id | name  |
| ------- | -- | ----- |
| t = 0s  | 1  | Frank |
| t = 10s | 1  | John  |

**Purchases**

| arrival | item     | customer\_id |
| ------- | -------- | ------------ |
| t = 0s  | Computer | 1            |
| t = 1s  | Keyboard | 2            |

Running the following query:

```sql
WITH customersTable AS (SELECT TABLE * FROM customers);

SELECT STREAM
    customers.name
   , item.item
FROM
    orders LEFT JOIN customersTable  
        ON customers.id = orders.id
```

would result in the following:

| name  | item     |
| ----- | -------- |
| Frank | Computer |
| null  | Keyboard |

If later, the record for `customer_id = 2` is available:

| arrival | id | name |
| ------- | -- | ---- |
| t = 10s | 2  | John |

a record would be emitted with the result now looking like the following:

| name  | item     |
| ----- | -------- |
| Frank | Computer |
| null  | Keyboard |
| John  | Keyboard |

Notice that “Keyboard” appears twice, once for the data is missing and another for when the data is made available.

This scenario will happen whenever a Stream is joined with a Table using a non-inner join.

### Table/Stream joins compatibility table  <a href="#tablestream-joins-compatibility-table" id="tablestream-joins-compatibility-table"></a>

The following table shows which combinations of table/stream joins are available:

| Left   | Right  | Allowed types    | Window   | Result |
| ------ | ------ | ---------------- | -------- | ------ |
| Stream | Stream | All              | Required | Stream |
| Table  | Table  | All              | No       | Table  |
| Table  | Stream | RIGHT JOIN       | No       | Stream |
| Stream | Table  | INNER, LEFT JOIN | No       | Stream |

### Key decoder types  <a href="#key-decoder-types" id="key-decoder-types"></a>

To evaluate a joint between two sources, the key facet for both sources has to share the same initial format.

If formats are not the same, the join can’t be evaluated. To address this issue, an intermediate topic can be created with the correct format using a STORE AS statement. This newly created topic can then be created as the new source.

### Co-partitioning  <a href="#co-partitioning" id="co-partitioning"></a>

In addition to the aforementioned constraint, when joining, it’s required that the partition number of both sources be the same.

When a mismatch is found, an additional step will be added to the joint evaluation to guarantee an equal number of partitions between the two sources. This step will write the data from the source topic with a smaller count of partitions into an intermediate one.

This newly created topic will match the partition count of the source with the highest partition count.

In the topology view, this step will appear as a Repartition Node.

### ON expressions and key change  <a href="#on-expressions-and-key-change" id="on-expressions-and-key-change"></a>

Joining two topics is only possible if the two sources used in the join share the same key shape and decoder.

When an ON statement is specified, the original key facet will have to change so that it matches the expression provided in the ON statement. Lenses will do this calculation automatically. As a result, the key schema of the result will not be the same as either one of the sources. It will be lenses calculated object equivalent to the join expression specified in the query.

### Nullability  <a href="#nullability" id="nullability"></a>

As discussed when addressing join types, some values may have null values when non-inner joins are used.

Due to this, fields with null values will be typed as the union of null and their original type.

### Joining more than 2 sources  <a href="#joining-more-than-2-sources" id="joining-more-than-2-sources"></a>

Within the same query, joins may only be evaluated between two sources.

When a join between more than two sources is required, multiple queries can be combined using a WITH statement:

```sql
WITH customerOrders AS (
 SELECT TABLE
    customer.name
   , order.item,
   , order._key AS order_id
 FROM
    customers INNER JOIN orders
        ON orders.customer_id = customers.id
);

INSERT INTO target
SELECT TABLE *
FROM customerOrders INNER JOIN deliveryAddress
    ON customerOders.order_id = deliveryAddress.order_id;
```

### **Joining and Grouping**&#x20;

To group the results of a join, one has to provide a `GROUP BY` expressions after a join expression are specified.

```sql
SET defaults.topic.autocreate=true;
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

WITH joined AS (
    SELECT STREAM
        customersTable.name
        , ordersStream.item
     FROM
        ordersStream JOIN customersTable
            ON customersTable.id = ordersStream.customer_id
    GROUP BY customersTable.name;
);
```

### **Stream-Table/Table-Stream joins: table materialization**&#x20;

| emmited | processed | id | name  |
| ------- | --------- | -- | ----- |
| t = 0s  | t = 20s   | 1  | Frank |

**Purchases**

| arrival | processed | item     | customer\_id |
| ------- | --------- | -------- | ------------ |
| t = 0s  | t = 10s   | Computer | 1            |

```sql
WITH customersStream AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);

INSERT INTO result
SELECT TABLE
    customersStream.name
   , ordersStream.item
 FROM
    ordersStream OUTER JOIN customersStream
        ON customersTable.id = ordersStream.customer_id;

```

When a join between a table and a join is processed, lenses will, for each stream input (orders in the example above), look for a matching record on the specified table (customers).

Notice that the record with Frank’s purchase information is processed at `t = 10s` at which point Frank’s Customer information hasn’t yet been processed. This means that no match will be found for this record.

At `t=20s` however, the record with Frank’s customer information is processed; this will only trigger the emission of a new record if an Outer Join is used.

### **Filter optimizations**&#x20;

There are some cases where filter expressions can help optimize a query. When a filter can be broken down into multiple steps, some can be applied before the join node is evaluated. This optimization will reduce the number of records entering the join node and consequentially increase its speed.

For this reason, in some cases, filters will show up before they join the topology node.

### Lateral Joins

With Lateral Joins, you can combine a data source with any array expression. As a result, you will get a new data source, where every record of the original one will be *joined* with the values of the lateral array expression.

Assume you have a `source` where `elements` is an array field:

| field1 | field2 | elements   |
| ------ | ------ | ---------- |
| a      | 1      | \[1, 2]    |
| b      | 2      | \[3, 4, 5] |
| c      | 3      | \[6]       |

Then a Lateral Join of `source` with `elements` is a new table where every record of `source` will be joined with all the single items of the value of `elements` for that record:

| field1 | field2 | elements   | element |
| ------ | ------ | ---------- | ------- |
| a      | 1      | \[1, 2]    | 1       |
| a      | 1      | \[1, 2]    | 2       |
| b      | 2      | \[3, 4, 5] | 3       |
| b      | 2      | \[3, 4, 5] | 4       |
| b      | 2      | \[3, 4, 5] | 5       |
| c      | 3      | \[6]       | 6       |

In this way, the single elements of the array become available and can be used as a normal field in the query.

### Syntax  <a href="#syntax" id="syntax"></a>

A query using lateral joins looks like a regular query apart from the definition of its source:

```sql
SELECT (STREAM|TABLE)
  <projection>
FROM
  <source> LATERAL
  <lateralArrayExpression> AS <lateralAlias>
WHERE
  <filterExpression>;
```

* **projection**: as in a single-table select, all the fields  `<source>` will be available in the projection. In addition to that, also a special field `<lateralAlias>` will be available.
* **source**: the source of data. Note: it is not possible to specify a normal join as a source of a lateral join. This limitation will be removed in the future.
* **lateralArrayExpression**: any expression that evaluates to an array. Fields  `<source>` are available for defining this expression.
* **filterExpression**: a filter expression specifying which records should be filtered.

### Single Lateral Joins  <a href="#single-lateral-joins" id="single-lateral-joins"></a>

Assume you have a topic *batched\_readings* populated with the following records:

**batched\_readings**

| \_key | meter\_id | readings           |
| ----- | --------- | ------------------ |
| a     | 1         | \[100, 80, 95, 91] |
| b     | 2         | \[87, 93, 100]     |
| c     | 1         | \[88, 89, 92, 94]  |
| d     | 2         | \[81]              |

As you can see, `readings` is a field containing arrays of integers.

We define a processor like this:

```sql
INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL readings AS reading
WHERE 
    reading > 90
```

The processor will emit the following records:

| \_key | meter\_id | reading |
| ----- | --------- | ------- |
| a     | 1         | 100     |
| a     | 1         | 95      |
| a     | 1         | 91      |
| b     | 2         | 93      |
| c     | 1         | 92      |
| c     | 1         | 94      |

Things to notice:

* We used the aliased lateral expression `reading` both in the projection and in the `WHERE`.
* The `_key` for each emitted record is one of the original records. As usual, you can change this behavior by projecting on the key with a projection like `expression AS _key`.
* `batched_readings` records with keys `a` and `b` have been split into multiple records. That’s because they contain multiple readings greater than `90`.
* Record `d` disappeared because it has no readings greater than `90`

### Multiple Lateral Joins  <a href="#multiple-lateral-joins" id="multiple-lateral-joins"></a>

It is possible to use multiple `LATERAL` joins in the same `FROM` clause.

Assume you have a topic *batched\_nested\_readings* populated with the following records:

**batched\_readings**

| \_key | meter\_id | nested\_readings         |
| ----- | --------- | ------------------------ |
| a     | 1         | \[\[100, 80], \[95, 91]] |
| b     | 2         | \[\[87], \[93, 100]]     |
| c     | 1         | \[\[88, 89], \[92, 94]]  |
| d     | 2         | \[\[81]]                 |

Notice how `nested_readings` contains arrays of arrays of integers.

To get the same results as the previous example, we use a first lateral join to unpack the first level of `nested_readings` into an array that we call `readings`. We then define a second lateral join on `readings` to extract the single values:

```sql
INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL nested_readings AS readings
    LATERAL readings as reading
WHERE 
    reading > 90
```

### Complex Lateral expressions  <a href="#complex-lateral-expressions" id="complex-lateral-expressions"></a>

In the previous example, we used a simple field as the `<lateralArrayExpression>`. This section will show how any array expression can be used.

Assume you have a topic *day\_night\_readings* populated with the following records:

**day\_night\_readings**

| \_key | meter\_id | readings\_day | readings\_night |
| ----- | --------- | ------------- | --------------- |
| a     | 1         | \[100, 80]    | \[95, 91]       |
| b     | 2         | \[87, 93]     | \[100]          |
| c     | 1         | \[88]         | \[89, 92, 94]   |
| d     | 2         | \[81]         | \[]             |

We can make use of Array Functions to lateral join `day_night_readings` on the concatenation of the two readings fields:

```sql
INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL flatten([readings_day, readings_night]) AS reading
WHERE 
    reading > 90The processor such defined will emit the records
```

| \_key | meter\_id | reading |
| ----- | --------- | ------- |
| a     | 1         | 100     |
| a     | 1         | 95      |
| a     | 1         | 91      |
| b     | 2         | 93      |
| c     | 1         | 92      |
| c     | 1         | 94      |
