delta lake的merge操作以及性能调优是怎样的


delta lake的merge操作以及性能调优是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。鉴于merge操作的复杂性,下面主要对其进行展开讲解。1.merge算子操作语法
merge操作的sql表达如下:
import io.delta.tables._importorg.apache.spark.sql.functions._

DeltaTable.forPath(spark, "/data/events/") .as("events") .merge( updatesDF.as("updates"), "events.eventId = updates.eventId") .whenMatched .updateExpr( Map("data" -> "updates.data")) .whenNotMatched .insertExpr( Map( "date" -> "updates.date", "eventId" -> "updates.eventId", "data" -> "updates.data")) .execute()merge 编码操作还是有些约束需要详细描述的。
1.1 可以有(1,2,3)个wenMatched或者whenNotMatched的子语句。其中,whenMatched操作最多有两个语句,whenNotMatched最多有一个子语句。
1.2 当源表的数据和目标表的数据满足匹配条件的时候,执行的是whenMatched语句。这些语句可以有以下几个语义:
a) whenMatched语句最多有一个update和一个delete表达。merge中的update行为仅仅更新满足条件的目标表一行数据的指定列。而delete操作会删除所有匹配的行。
b)每个whenMatched语句都可以有一个可选的条件。如果该可选的条件存在,update和delete操作仅仅在该可选条件为true的时候,才会在匹配的目标数据上执行相应操作。
c)如果有两个whenMatched子句,则将按照它们被指定的顺序(即,子句的顺序很重要)进行执行。第一个子句必须具有一个子句条件(否则,第二个子句将永远不会执行)。d)如果两个whenMatched子语句都有条件并且两个子语句的条件都不为true,那不会对目标数据进行任何修改。
c) 支持满足条件的源dataset中相关行的所有列同时更新到目标detla表的相关列,表达式如下:等价于:要保证源表和目标表有相同的列,否则会抛出异常。1.3 给定的条件,源表的一行数据,跟目标表没有完成匹配的时候执行whenNotMatched语句。该子语句有以下语法:
a) whenNotMatched仅仅支持insert表达。根据指定的列和相关的条件,该操作会在目标表中插入一条新的数据,当目标表中存在的列没有明确的指定的时候,就插入null。
b) whenNotMatched语句可以有可选条件。如果指定了可选条件,数据仅仅会在可选条件为true的时候才会插入。否则,源列会被忽略。
c)也可以插入匹配目标表相关行的所有源表行的数据列,表达式:
等价于:
要保证源表和目标表有相同的列,否则就会抛出异常。
2.schema校验merge操作会自动校验insert和update操作产生额数据schema是否与目标表的schema匹配。规则如下:
a)对于update和insert行为,指定的目标列必须在目标delta lake表中存在。b)对于updateAll和insertAll操作,源dataset必须包含所有目标表的列。源dataset可以有目标表中不存在的列,但是这些列会被忽略。当然也可以通过配置保留仅源dataset有的列。c)对于所有操作,如果由生成目标列的表达式生成的数据类型与目标Delta表中的对应列不同,则merge尝试将其强制转换为表中的类型。3.自动schema转换
默认情况下,updateAll和insertAll操作仅仅会更新或插入在目标表中有的相同列名的列,对于仅仅在源dataset中存在而目标表中不存在的列,会被忽略。但是有些场景下,我们希望保留源dataset中新增的列。首先需要将前面介绍的一个参数spark.databricks.delta.schema.autoMerge.enabled设置为true。注意:
a. schema自动增加仅仅是针对updateAll操作或者 香港云主机insertAll操作,或者两者。b. 仅仅顶层的列会被更改,而不是嵌套的列。
c. 更新和插入操作不能显式引用目标表中不存在的目标列(即使其中有updateAll或insertAll作为子句之一)。4.schema推断与否对比
据一些例子,进行schema自动推断与不自动推断的对比对比一目标列(key,value),源列(key,value,newValue),对源源表执行下面的sql操作:没有使用自动schema推断的话:目标表的schema信息是不会变的。仅仅key,value列被更新。
使用了schema推断的话:表的schema就会演变为(key,value,newValue)。updateAll操作,会更新value和newValue列。对于insertAll操作会插入整行(key,value,newValue)。对比二
目标表(key,oldValue),源表(key,newValue),对源表执行下面的sql:
不使用schema推断:updateAll和insertAll操作都会抛异常。使用schema推断:表的shema会演变为(key,oldValue,newValue)。updateAll操作会更新key和value列,而oldValue列不变。insertAll操作会插入(key,null,newValue),oldValue会插入null。对比三
目标表(key,oldValue),源表(key,newValue),对源表执行下面的sql不使用schema推断:update操作会抛出异常,因为newValue在目标表中并不存在。
使用schema推断:update操作会抛出异常,因为newValue在目标表中并不存在。对比四:
目标表(key,oldValue),源表(key,newValue),对源表执行下面的sql不使用schema推断:insert操作会抛出异常,因为newValue在目标表中并不存在。使用schema推断:insert操作依然会抛出异常,因为newValue在目标表中并不存在。5.性能调优
下面几个方法可以有效减少merge的处理时间:
a.减少匹配查找的数据量
默认情况下,merge操作会扫描整个delta lake表找到满足条件的数据。可以加些谓词,以减少数据量。比如,数据是以country和date进行分区的,而你只想更新特定国家的昨天的数据。就可以增加一些条件,比如:这样就只会处理指定分区的数据,大大减少了数据扫描量。也可以避免不同分区之间操作的一些冲突。
b.合并文件
如果数据存储的时候有很多小文件,就会降低数据的读取速度。可以合并小文件成一些大文件,来提升读取的速度。后面会说到这个问题。
c.控制shuffle的分区数
为了计算和更新数据,merge操作会对数据进行多次shuffle。shuffle过程中task数量是由参数spark.sql.shuffle.partitions来设置,默认是200。该参数不仅能控制shuffle的并行度,也能决定输出的文件数。增加这个值虽然可以增加并行度,但也相应的增加了产生小文件数。
d.写出数据之间进行重分区
对与分区表,merge操作会产生很多小文件,会比shuffle分区数多很多。原因是每个shuffle任务会为多分区表产生更多的文件,这可能会是一个性能瓶颈。所以,很多场景中使用表的分区列对数据进行写入前重分区是很有效的。可以通过设置spark.delta.merge.repartitionBeforeWrite为true来生效。关于delta lake的merge操作以及性能调优是怎样的问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注开发云行业资讯频道了解更多相关知识。

相关推荐: C++如何使用泛型lambda表达式​

本篇内容介绍了“C++如何使用泛型lambda表达式”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!C.170: 如果需要重载lambda表达式,使用泛型lamb…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 10/09 16:54
下一篇 10/09 16:54

相关推荐