本文共 2197 字,大约阅读时间需要 7 分钟。
Join是SQL语言中常用的操作,一般用于建立多表之间的连接关系。spark SQL有两类(三种)Join的实现,每种Join的实现方式都有各自不同的应用场景。
先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,参与join的两张表是order和item,join key分别是item.id以及order.i_id。现在假设join采用的是hash join算法,整个过程会经历三步:
确定Build Table以及Probe Table:这个概念比较重要,Build Table会被构建成以join key为key的hash table,而Probe Table使用join key在这张hash table表中寻找符合条件的行,然后进行join链接。Build表和Probe表是Spark决定的。通常情况下,小表会被作为Build Table,较大的表会被作为Probe Table。
构建Hash Table:依次读取Build Table(item)的数据,对于每一条数据根据Join Key(item.id)进行hash,hash到对应的bucket中(类似于HashMap的原理),最后会生成一张HashTable,HashTable会缓存在内存中,如果内存放不下会dump到磁盘中。
匹配:生成Hash Table后,在依次扫描Probe Table(order)的数据,使用相同的hash函数在Hash Table中寻找hash(join key)相同的值,如果匹配成功就将两者join在一起。
基本原理可以参考上图,这里有两个问题需要关注:
上文的hash join是传统数据库中的单机join算法,为了尽可能利用分布式计算资源进行并行计算,提高总体效率,在分布式环境下需要经过一定的改造。hash join分布式实现有broadcast hash join和shuffle hash join两种方案:
Broadcast Hash Join
Broadcast Hash Join可以分为两步:Broadcast Hash Join有以下几个条件:
Broadcast Hash Join的缺点也是比较明显的,即只能广播较小的表,否则数据的冗余传输会远大于shuffle的开销。另外被广播的表在每个executor上都会保存一份,这会对executor的内存造成一定的压力。
Shuffle Hash Join
Shuffle Hash Join分为两步:Hash Join方式只对于两张表中有一张是小表的情况适用,但当两个表都非常大时则不适合。这是因为join时生成的join key会非常大,不能将数据完全加载到内存中。为了应对大表join的场景,SparkSQL提供了一种全新的方案Sort Merge Join。
Sort Merge Join首先将两张表按照join key进行重新shuffle,保证join key值相同的记录会被分在相应的分区,分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。可以看出,无论分区有多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,因为两个序列都是有序的,此时从头遍历,碰到key相同的就输出,如果不同,左边小就继续取左边,反之取右边。
整个过程分为三个步骤:
转载地址:http://wgcmb.baihongyu.com/