Pipes and filters at DBMS-level: Splitting the MERGE output stream(DBMS级别的管道和过滤器:拆分合并输出流)
问题描述
方案
我们有一个非常标准的数据导入过程,在该过程中我们加载staging表,然后MERGE将其放入target表。
新需求(绿色)涉及捕获导入数据的子集
放到单独的queue表中以进行完全无关的处理。
"挑战"
(1)子集由以下记录的选择组成: 仅新插入target表。
(2)子集是一些插入列的投影,但也
至少有一列仅存在于源中(staging
表)。
(3)MERGE语句已使用OUTPUT..INTO子句
严格记录MERGE拍摄的$action,以便
PIVOT结果和COUNT插入、更新和
出于统计目的的删除。我们真的不喜欢缓冲
对整个数据集执行类似的操作,并且更倾向于聚合
飞翔上的金额。不用说,我们不想将更多数据添加到
此OUTPUT表。
(4)我们不想做MERGE
无论出于何种原因,即使是部分原因,都会执行第二次。这个
target表非常大,我们不能索引所有内容,并且
操作通常相当昂贵(分钟,而不是秒)。
(5)我们不考虑将MERGE的任何输出往返到
客户端只是为了使客户端可以通过以下方式将其路由到queue
马上把它送回去。数据必须保留在服务器上。
(6)我们希望避免在临时存储中缓冲整个数据集
在staging和queue之间。
做这件事的最佳方式是什么?
故障
(A)仅将插入的记录入队的要求阻止我们 的OUTPUT..INTO子句中直接以queue表为目标
MERGE,因为它不允许任何WHERE子句。我们可以用一些
CASE标记不需要的记录以供后续删除的诡计
来自queue而不进行处理,但这似乎很疯狂。
(B)因为queue的某些列没有出现在
target表中,我们不能简单地在目标上添加插入触发器
表来加载queue。"数据流拆分"必须更早发生。
(C)由于我们已经在MERGE中使用了OUTPUT..INTO子句,因此我们
无法添加第二个OUTPUT子句和将MERGE嵌套到
INSERT..SELECT加载队列。这太可惜了,因为它
感觉对一些可行的东西来说,这是一个完全武断的限制
否则很好;SELECT只过滤具有
$action我们希望(INSERT)和INSERT在queue中
声明。因此,从理论上讲,DBMS可以避免缓冲整个
数据集,只需将其流式传输到queue。(注:我们没有追查
而且很可能它实际上并没有以这种方式优化计划。)
情况
我们觉得我们已经用尽了我们的选择,但决定求助于Hivemind
可以肯定的是。我们能想到的只有:
(S1)创建target表的VIEW,该表也包含可为空的
仅用于queue的数据的列,并具有
SELECT语句将它们定义为NULL。然后,设置INSTEAD OF
同时填充target表和queue的触发器
恰如其分。最后,将MERGE连接到目标视图。这
是可行的,但是我们并不喜欢这个结构--它绝对是
看起来棘手。
(S2)放弃,使用以下命令在临时表中缓冲整个数据集
另一个MERGE..OUTPUT。在MERGE之后,立即复制数据
(再说一遍!)从临时表导入queue。
推荐答案
我的理解是,主要障碍是SQL Server中OUTPUT子句的限制。它允许一个OUTPUT INTO table和/或一个OUTPUT将结果集返回给调用方。
您希望以两种不同的方式保存MERGE语句的结果:
- 受
MERGE影响的所有行,用于收集统计信息 - 仅为
queue插入行
简单变量
我会使用您的S2解决方案。至少一开始是这样。它很容易理解和维护,而且应该非常高效,因为最耗费资源的操作(MERGEintoTarget本身只执行一次)。下面还有第二个变体,比较它们在真实数据上的性能会很有趣。
所以:
- 在
MERGE中使用 - 插入前
INSERT从@TempTable到Stats的所有行或聚合。如果您只需要聚合统计数据,则聚合此批次的结果并将其合并到最终Stats而不是复制所有行是有意义的。 INSERT到Queue中仅"插入"了@TempTable中的行。
OUTPUT INTO @TempTable
我将从@i-one的答案中提取样本数据。
架构
-- I'll return to commented lines later
CREATE TABLE [dbo].[TestTarget](
-- [ID] [int] IDENTITY(1,1) NOT NULL,
[foo] [varchar](10) NULL,
[bar] [varchar](10) NULL
);
CREATE TABLE [dbo].[TestStaging](
[foo] [varchar](10) NULL,
[bar] [varchar](10) NULL,
[baz] [varchar](10) NULL
);
CREATE TABLE [dbo].[TestStats](
[MergeAction] [nvarchar](10) NOT NULL
);
CREATE TABLE [dbo].[TestQueue](
-- [TargetID] [int] NOT NULL,
[foo] [varchar](10) NULL,
[baz] [varchar](10) NULL
);
样本数据
TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];
INSERT INTO [dbo].[TestStaging]
([foo]
,[bar]
,[baz])
VALUES
('A', 'AA', 'AAA'),
('B', 'BB', 'BBB'),
('C', 'CC', 'CCC');
INSERT INTO [dbo].[TestTarget]
([foo]
,[bar])
VALUES
('A', 'A_'),
('B', 'B?');
合并
DECLARE @TempTable TABLE (
MergeAction nvarchar(10) NOT NULL,
foo varchar(10) NULL,
baz varchar(10) NULL);
MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;
INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;
INSERT INTO [dbo].[TestQueue]
([foo]
,[baz])
SELECT
T.foo
,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;
SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];
结果
TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A | AA |
| B | BB |
| C | CC |
+-----+-----+
TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT |
| UPDATE |
| UPDATE |
+-------------+
TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C | CCC |
+-----+-----+
第二个变量
在SQL Server 2014 Express上测试。
OUTPUT子句可以将其结果集发送到表和调用方。因此,OUTPUT INTO可以直接进入Stats,如果我们将MERGE语句包装到存储过程中,则可以使用INSERT ... EXEC进入Queue。
INSERT ... EXEC无论如何都会在后台创建一个临时表(另请参阅The Hidden Costs of INSERT EXECby
Adam Machanic),所以我预计当您显式创建临时表时,总体性能将类似于第一个变体。
还有一个问题需要解决:Queue表应该只有"插入"的行,而不是所有受影响的行。要实现这一点,您可以在Queue表上使用触发器来丢弃"INSERT"之外的行。另一种可能性是使用IGNORE_DUP_KEY = ON定义唯一索引,并以这样的方式准备数据:"未插入"的行将违反唯一索引,并且不会插入到表中。
因此,我将向Target表中添加一个ID IDENTITY列,并向Queue表中添加一个TargetID列。(在上面的脚本中取消对它们的注释)。
另外,我将向Queue表添加索引:
CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
[TargetID] ASC
) WITH (
PAD_INDEX = OFF,
STATISTICS_NORECOMPUTE = OFF,
SORT_IN_TEMPDB = OFF,
IGNORE_DUP_KEY = ON,
DROP_EXISTING = OFF,
ONLINE = OFF,
ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON)
重要部分是UNIQUE和IGNORE_DUP_KEY = ON。
以下是MERGE的存储过程:
CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
MERGE INTO dbo.TestTarget AS Dst
USING dbo.TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action INTO dbo.TestStats(MergeAction)
OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID,
inserted.foo,
Src.baz
;
END
用法
TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];
-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
([TargetID]
,[foo]
,[baz])
VALUES
(0
,NULL
,NULL);
INSERT INTO [dbo].[TestStaging]
([foo]
,[bar]
,[baz])
VALUES
('A', 'AA', 'AAA'),
('B', 'BB', 'BBB'),
('C', 'CC', 'CCC');
INSERT INTO [dbo].[TestTarget]
([foo]
,[bar])
VALUES
('A', 'A_'),
('B', 'B?');
INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];
SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];
结果
TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
| 1 | A | AA |
| 2 | B | BB |
| 3 | C | CC |
+----+-----+-----+
TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT |
| UPDATE |
| UPDATE |
+-------------+
TestQueue
+----------+------+------+
| TargetID | foo | baz |
+----------+------+------+
| 0 | NULL | NULL |
| 3 | C | CCC |
+----------+------+------+
在INSERT ... EXEC期间将有额外的消息:
Duplicate key was ignored.
IFMERGE更新了一些行。当唯一索引由于IGNORE_DUP_KEY = ON而在INSERT期间丢弃某些行时发送此警告消息。
插入重复键值时将出现警告消息 转换为唯一索引。只有违反唯一性约束的行 将失败。
这篇关于DBMS级别的管道和过滤器:拆分合并输出流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:DBMS级别的管道和过滤器:拆分合并输出流
基础教程推荐
- mysql选择动态行值作为列名,另一列作为值 2021-01-01
- MySQL 中的类型:BigInt(20) 与 Int(20) 2021-01-01
- 表 './mysql/proc' 被标记为崩溃,应该修复 2022-01-01
- 如何根据该 XML 中的值更新 SQL 中的 XML 2021-01-01
- 什么是 orradiag_<user>文件夹? 2022-01-01
- 二进制文件到 SQL 数据库 Apache Camel 2021-01-01
- 在多列上分布任意行 2021-01-01
- 如何在 SQL 中将 Float 转换为 Varchar 2021-01-01
- oracle区分大小写的原因? 2021-01-01
- 在 MySQL 中:如何将表名作为存储过程和/或函数参数传递? 2021-01-01
