发布时间:2025-11-05 08:06:55 来源:码上建站 作者:域名

大家好,知其字详作我是所然述老羊,今天我们来学习 Flink SQL 中的两万· Join 操作。
Flink 支持了非常多的知其字详作数据 Join 方式,主要包括以下三种:
动态表(流)与动态表(流)的所然述 Join。动态表(流)与外部维表(比如 Redis)的两万 Join。动态表字段的知其字详作列转行(一种特殊的 Join)。细分 Flink SQL 支持的所然述 Join:
Regular Join:流与流的 Join,包括 Inner Equal Join、两万Outer Equal Join。知其字详作Interval Join:流与流的所然述 Join,两条流一段时间区间内的两万 Join。Temporal Join:流与流的知其字详作 Join,包括事件时间,所然述处理时间的两万 Temporal Join,类似于离线中的快照 Join。Lookup Join:流与外部维表的 Join。Array Expansion:表字段的列转行,云服务器类似于 Hive 的 explode 数据炸开的列转行。Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join。下面这个案例为 Inner Join 案例:
复制-- 曝光日志数据CREATE TABLE show_log_table ( log_id BIGINT,show_params STRING
) WITH ( connector = datagen, rows-per-second = 2, fields.show_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 100);-- 点击日志数据CREATE TABLE click_log_table ( log_id BIGINT,click_params STRING
)WITH ( connector = datagen, rows-per-second = 2, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT,c_params STRING
) WITH ( connector = print);-- 流的 INNER JOIN,条件为 log_idINSERT INTOsink_table
SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params asc_params
FROMshow_log_table
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.输出结果如下:
复制+I[5, d, 5, f]+I[5, d, 5, 8]+I[5, d, 5, 2]+I[3, 4, 3, 0]+I[3, 4, 3, 3]...1.2.3.4.5.6.如果为 Left Join 案例:
复制CREATE TABLE show_log_table ( log_id BIGINT,show_params STRING
) WITH ( connector = datagen, rows-per-second = 1, fields.show_params.length = 3, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE click_log_table ( log_id BIGINT,click_params STRING
)WITH ( connector = datagen, rows-per-second = 1, fields.click_params.length = 3, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT,c_params STRING
) WITH ( connector = print);INSERT INTOsink_table
SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params asc_params
FROMshow_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.输出结果如下:
复制+I[5, f3c, 5, c05]+I[5, 6e2, 5, 1f6]+I[5, 86b, 5, 1f6]+I[5, f3c, 5, 1f6]-D[3, 4ab, null, null]-D[3, 6f2, null, null]+I[3, 4ab, 3, 765]+I[3, 6f2, 3, 765]+I[2, 3c4, null, null]+I[3, 4ab, 3, a8b]+I[3, 6f2, 3, a8b]+I[2, c03, null, null]...1.2.3.4.5.6.7.8.9.10.11.12.13.如果为 Full Join 案例:
复制CREATE TABLE show_log_table ( log_id BIGINT,show_params STRING
) WITH ( connector = datagen, rows-per-second = 2, fields.show_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE click_log_table ( log_id BIGINT,click_params STRING
)WITH ( connector = datagen, rows-per-second = 2, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT,c_params STRING
) WITH ( connector = print);INSERT INTOsink_table
SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params asc_params
FROMshow_log_table
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.输出结果如下:
复制+I[null, null, 7, 6]+I[6, 5, null, null]-D[1, c, null, null]+I[1, c, 1, 2]+I[3, 1, null, null]+I[null, null, 7, d]+I[10, 0, null, null]+I[null, null, 2, 6]-D[null, null, 7, 6]-D[null, null, 7, d]...1.2.3.4.5.6.7.8.9.10.11.关于 Regular Join 的b2b信息网注意事项:
实时 Regular Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联。Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出。流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。SQL 语义:详细的 SQL 语义案例可以参考:
flink sql 知其所以然(十二):流 join 很难嘛???(上)。
flink sql 知其所以然(十三):流 join 很难嘛???(下)。
可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
实际案例:还是刚刚的案例,曝光日志关联点击日志筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 小时之内的点击,并且补充点击的扩展参数(show inner interval click):下面为 Inner Interval Join:
复制CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time ASrow_time
) WITH ( connector = datagen, rows-per-second = 1, fields.show_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time ASrow_time
)WITH ( connector = datagen, rows-per-second = 1, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT,c_params STRING
) WITH ( connector = print);INSERT INTOsink_table
SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params asc_params
FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_idAND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL 4 HOUR AND click_log_table.row_time;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.输出结果如下:
复制6> +I[2, a, 2, 6]6> +I[2, 6, 2, 6]2> +I[4, 1, 4, 5]2> +I[10, 8, 10, d]2> +I[10, 7, 10, d]2> +I[10, d, 10, d]2> +I[5, b, 5, d]6> +I[1, a, 1, 7]1.2.3.4.5.6.7.8.如果是 Left Interval Join:
复制CREATE TABLE show_log ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time ASrow_time
) WITH ( connector = datagen, rows-per-second = 1, fields.show_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE click_log ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time ASrow_time
)WITH ( connector = datagen, rows-per-second = 1, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT,c_params STRING
) WITH ( connector = print);INSERT INTOsink_table
SELECT show_log.log_id as s_id, show_log.show_params as s_params, click_log.log_id as c_id, click_log.click_params asc_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_idAND show_log.row_time BETWEEN click_log.row_time - INTERVAL 5 SECOND AND click_log.row_time + INTERVAL 5 SECOND;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.输出结果如下:
复制+I[6, e, 6, 7]+I[11, d, null, null]+I[7, b, null, null]+I[8, 0, 8, 3]+I[13, 6, null, null]1.2.3.4.5.如果是 Full Interval Join:
复制CREATE TABLE show_log ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time ASrow_time
) WITH ( connector = datagen, rows-per-second = 1, fields.show_params.length = 1, fields.log_id.min = 5, fields.log_id.max = 15);CREATE TABLE click_log ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time ASrow_time
)WITH ( connector = datagen, rows-per-second = 1, fields.click_params.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT,c_params STRING
) WITH ( connector = print);INSERT INTOsink_table
SELECT show_log.log_id as s_id, show_log.show_params as s_params, click_log.log_id as c_id, click_log.click_params asc_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_idAND show_log.row_time BETWEEN click_log.row_time - INTERVAL 5 SECOND AND click_log.row_time + INTERVAL 5 SECOND;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.输出结果如下:
复制+I[6, 1, null, null]+I[7, 3, 7, 8]+I[null, null, 6, 6]+I[null, null, 4, d]+I[8, d, null, null]+I[null, null, 3, b]1.2.3.4.5.6.关于 Interval Join 的注意事项:
实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出。
SQL 语义:关于详细的 SQL 语义可以参考。
flink sql 知其所以然(十三):流 join 很难嘛???(下)。
ENFORCED
) WITH ( connector = kafka, value.format = debezium-json, /* ... */);1.2.3.4.5.6.7.8.9.10.11.12.13. Deduplicate 定义方式: 复制-- 定义一个 append-only 的数据源表CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time ASupdate_time
) WITH ( connector = kafka, value.format = debezium-json, /* ... */);-- 将数据源表按照 Deduplicate 方式定义为 Versioned TableCREATE VIEW versioned_rates ASSELECT currency, conversion_rate, update_time -- 1. 定义 `update_time` 为时间字段 FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定义 `currency` 为主键 ORDER BY update_time DESC -- 3. ORDER BY 中必须是时间戳列 ) ASrownum
FROM currency_rates)WHERE rownum = 1;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22. Temporal Join 支持的时间语义:事件时间、处理时间。实际案例:就是上文提到的汇率计算。以 事件时间 任务举例:
复制-- 1. 定义一个输入订单表CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time ASorder_time
) WITH (/* ... */);-- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time, PRIMARY KEY(currency) NOTENFORCED
) WITH ( connector = kafka, value.format = debezium-json, /* ... */);SELECT order_id, price, currency, conversion_rate, order_time,FROMorders
-- 3. Temporal Join 逻辑-- SQL 语法为:FOR SYSTEM_TIME AS OFLEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_timeON orders.currency = currency_rates.currency;1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:
复制order_id price 货币 汇率 order_time
======== ===== ======== =============== =========o_001 11.11 EUR 1.14 12:00:00o_002 12.51 EUR 1.10 12:06:001.2.3.4.注意:
⭐ 事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark。⭐ 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中。还是相同的案例,如果是 处理时间 语义:
复制10:15> SELECT * FROM LatestRates;currency rate
======== ======US Dollar 102Euro 114Yen 110:30> SELECT * FROM LatestRates;currency rate
======== ======US Dollar 102Euro 114Yen 1-- 10:42 时,Euro 的汇率从 114 变为 11610:52> SELECT * FROM LatestRates;currency rate
======== ======US Dollar 102Euro 116 <==== 从 114 变为 116Yen 1-- 从 Orders 表查询数据SELECT * FROM Orders;amount currency
====== ========= 2 Euro <== 在处理时间 10:15到达的一条数据
1 US Dollar <== 在处理时间 10:30到达的一条数据
2 Euro <== 在处理时间 10:52到达的一条数据
-- 执行关联查询SELECT o.amount, o.currency, r.rate, o.amount * r.rateFROM Orders ASo
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime ASr
ON r.currency = o.currency-- 结果如下:amount currency rate amount*rate
====== ========= ======= ============ 2 Euro 114 228 <== 在处理时间 10:15到达的一条数据
1 US Dollar 102 102 <== 在处理时间 10:30到达的一条数据
2 Euro 116 232 <== 在处理时间 10:52 到达的一条数据1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.可以发现处理时间就比较好理解了,因为处理时间语义中是根据左流数据到达的时间决定拿到的汇率值。Flink 就只为 LatestRates 维护了最新的状态数据,不需要关心历史版本的数据。
来一波输入数据:
曝光用户日志流(show_log)数据(数据存储在 kafka 中):
复制log_id timestampuser_id
1 2021-11-01 00:01:03a
2 2021-11-01 00:03:00b
3 2021-11-01 00:05:00c
4 2021-11-01 00:06:00b
5 2021-11-01 00:07:00 c1.2.3.4.5.6.用户画像维表(user_profile)数据(数据存储在 redis 中):
复制user_id(主键)age sex
a 12-18男
b 18-24女
c 18-24 男1.2.3.4.注意:
redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。
具体 SQL:
复制CREATE TABLE show_log ( log_id BIGINT, `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)), user_id STRING, proctime AS PROCTIME())WITH ( connector = datagen, rows-per-second = 10, fields.user_id.length = 1, fields.log_id.min = 1, fields.log_id.max = 10);CREATE TABLE user_profile ( user_id STRING, age STRING,sex STRING
) WITH ( connector = redis, hostname = 127.0.0.1, port = 6379, format = json, lookup.cache.max-rows = 500, lookup.cache.ttl = 3600, lookup.max-retries = 1);CREATE TABLE sink_table ( log_id BIGINT, `timestamp` TIMESTAMP(3), user_id STRING, proctime TIMESTAMP(3), age STRING,sex STRING
) WITH ( connector = print);-- lookup join 的 query 逻辑INSERT INTOsink_table
SELECT s.log_id aslog_id
, s.`timestamp` as`timestamp`
, s.user_id asuser_id
, s.proctime asproctime
, u.sex assex
, u.age asage
FROM show_log ASs
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime ASu
ON s.user_id = u.user_id1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.输出数据如下:
复制log_id timestampuser_id age sex
1 2021-11-01 00:01:03 a 12-18男
2 2021-11-01 00:03:00 b 18-24女
3 2021-11-01 00:05:00 c 18-24男
4 2021-11-01 00:06:00 b 18-24女
5 2021-11-01 00:07:00 c 18-24 男1.2.3.4.5.6.注意:
实时的 lookup 维表关联能使用 处理时间 去做关联。
SQL 语义:详细 SQL 语义及案例可见:
flink sql 知其所以然:维表 join 的性能优化之路(上)附源码。
flink sql 知其所以然:改了改源码,实现了个 batch lookup join(附源码)。
其实,Flink 官方并没有提供 redis 的维表 connector 实现。
没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的!
注意:
同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 为 1 的数据在 08:00 时 age 由 12-18 变为了 18-24,那么当我们的任务在 08:01 failover 之后从 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24 的 age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据。再说说维表常见的性能问题及优化思路。
所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。
举个例子:
在没有使用维表的情况下:一条数据从输入 Flink 任务到输出 Flink 任务的时延假如为 0.1 ms,那么并行度为 1 的任务的吞吐可以达到 1 query / 0.1 ms = 1w qps。在使用维表之后:每条数据访问维表的外部存储的时长为 2 ms,那么一条数据从输入 Flink 任务到输出 Flink 任务的时延就会变成 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到1 query / 2.1 ms = 476 qps。两者的吞吐量相差 21 倍。这就是为什么维表 join 的算子会产生背压,任务产出会延迟。
那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:
按照 redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过得 local cache 即可。这样就可以把访问外部存储 2.1 ms 处理一个 query 变为访问内存的 0.1 ms 处理一个 query 的时长。异步访问外存:DataStream api 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 ms 处理 1 个 query 变为 2.1 ms 处理 10 个 query。吞吐可变优化到 10 / 2.1 ms = 4761 qps。批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 redis 维表的 1 query 占用 2.1 ms 时长中,其中可能有 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 ms 是 redis server 处理请求的时长。那么我们就可以使用 redis 提供的 pipeline 能力,在客户端(也就是 flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 redis sever。这样就可以把 2.1 ms 处理 1 个 query 变为 7ms(2ms + 50 * 0.1ms) 处理 50 个 query。吞吐可变为 50 query / 7 ms = 7143 qps。博主这里测试了下使用 redis pipeline 和未使用的时长消耗对比。如下图所示。博主认为上述优化效果中,最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。
既然 DataStream 可以这样做,Flink SQL 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作:
按照 redis 维表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做访问 redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 redis 压力。在博主实现的 redis connector 中,内置了 local cache 的实现,小伙伴萌可以参考下面这部篇文章进行配置。异步访问外存:目前博主实现的 redis connector 不支持异步访问,但是官方实现的 hbase connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/。批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 redis 的批量访问外存优化的功能。具体可以参考下文。flink sql 知其所以然:改了改源码,实现了个 batch lookup join(附源码)。
show_param STRING
) WITH ( connector = print);INSERT INTOsink_table
SELECT log_id, t.show_param asshow_param
FROMshow_log_table
-- array 炸开语法CROSS JOIN UNNEST(show_params) AS t (show_param)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.show_log_table 原始数据:
复制+I[7, [a, b, c]]+I[5, [d, e, f]]1.2.输出结果如下所示:
复制-- +I[7, [a, b, c]] 一行转为 3 行+I[7, a]+I[7, b]+I[7, b]-- +I[5, [d, e, f]] 一行转为 3 行+I[5, d]+I[5, e]+I[5, f]1.2.3.4.5.6.7.8.自定义输出逻辑
if (userId <= 5) { // 一行转 1行
collect(1); } else { // 一行转 3行
collect(1); collect(2); collect(3); } } }}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.执行结果如下:
复制-- <= 5,则只有 1 行结果+I[3, 7, 1, 2021-05-01T18:23:42.560]-- > 5,则有行 3 结果+I[8, e, 1, 2021-05-01T18:23:42.560]+I[8, e, 2, 2021-05-01T18:23:42.560]+I[8, e, 3, 2021-05-01T18:23:42.560]-- <= 5,则只有 1 行结果+I[4, 9, 1, 2021-05-01T18:23:42.561]-- > 5,则有行 3 结果+I[8, c, 1, 2021-05-01T18:23:42.561]+I[8, c, 2, 2021-05-01T18:23:42.561]+I[8, c, 3, 2021-05-01T18:23:42.561]1.2.3.4.5.6.7.8.9.10.11.12.随便看看