大数据面试题(自己总结的) 面试题 linux: 使用哪一个命令可以查看自己文件系统的磁盘空间配额呢? 使用命令repquota 能够显示出一个文件系统的配额信息 说一下ps和top命令的区别 ps命令只能显示执行瞬间的进程状态 top命令实时跟进进程状态 Hadoop hdfs1.0和2.0架构组成? hdfs1.0: : 作为记录和协调节点,主要记录了⽂件的数据信息、⽂件与块的映射信息以及块与datanode的映射信息。前⼆者会被实例化到硬盘,后者由datanode汇报; 和datanode保持⼼跳,监控DD状态; 接收客户端的请求; 记录⽇志。 : 存放⽂件数据信息和校验信息,默认备份三分数据; ⼼跳机制 读写数据 : 合并fsimage快照和edit hdfs2.0 主节点 ,功能和hadoop1.0⼀样 :备⽤NameNode 不能发送指令 合并快照和⽇志,其他功能主节点⼀致,同样需要和DN保持⼼跳 :数据节点 和主备NN都保持⼼跳 :⼀致性⽇志管理器 ⼀般有3个搭建集群,每次NN Active操作前先将⽇志存放到QJM上,存储成功,主节点开始操作。 SNN不是实时最新⽇志,⽽是每隔⼀段时间去⼀次。 :故障转移控制器 ⽤于主备节点的切换 由zookeeper维护和投票选举 :分布式协调框架 启动时协助选举 记录当前主节点的状态 hdfs读写流程 客户端向HDFS服务器发送写数据请求—>到达hdfs的⽂件系统⽂件系统通过远程RPC调⽤ namenode 的 create() ⽅法进⾏校验请求: 检查是否有空间写⼊这个⽂件、请求路径是否存在、当前⽤户权限是否允许写⼊ 检查不通过,抛出异常给客户端 检查通过,返回成功信息,NN创建⼀个entry对象⽤于记录数据、⽂件与Block映射 、Block 与DataNote的映射; ⽂件系统收到成功消息,创建⼀个输出流给客户端客户端开始传输⽂件,⾸先需要向NameNode询问第⼀个Block存放的位置,NameNode通过机架 感知策略将第⼀个block与DN的映射地址发送给客户端。客户端和DataNode节点创建管道pipeline连接,⽂件数据被切分成block,传输时再划分成⼀个个 packet进⾏传输。客户端收到成功消息,继续发送剩下的packet,直到第⼀块block发送完成,撤销当前管道。将第⼀块block发送成功信息发送给namenode,namenode记录更新entry信息。重复操作直到所有block传输完成,关闭输出流。 ⼀、建⽴DataQueue队列和AckQueue队列来解决数据传输错误的问题。 ⼆、buffer输出流传输字节数组到package时,通过更⼩单的chunk来分装,每个chunk为512b,同 时还有checksum校验数据4b,构成516b⼤⼩每次传⼊package。 主要流程:⾸先客户端从⾃⼰的硬盘以流的⽅式读取数据⽂件到⾃⼰的缓存(内存)中。然后将缓存中的数据以chunk(512B)和checksum(4B)的⽅式放⼊到packet(64K)。当packet满的时候加⼊到添加到 dataqueue (队列,先进先出顺序)。datastreamer将package传输并挂载到ack队列。客户端发送⼀个Packet数据包以后,会有⼀个⽤来接收ack的ResponseProcessor进程开始接收 ack: 如果true,那么就从 ackqueue删除掉这个packet ; 如果false,将 ackqueue中所有的packet重新挂载到 发送队列,重新发送。 :客户端发送读请求到DFS。DFS通过远程RPC调⽤Open⽅法,去NameNode查找这个⽂件的信息(权限,⽂件是否存在) 。如果成功,DFS创建FSDataInputStream对象,客户端通过这个对象读取数据 。客户端去NameNode⽂件第⼀个Block信息,就近原则选择DN对应的数据读取。依次类推读取其他块的信息,直到最后⼀个块,将Block合并成⼀个⽂件。关闭FSDataInputStream流。 Yarn 的资源调度流程 客户端发送各种类型的job给服务器,yarn资源管理器分配⼀个当前任务的领队 job1ApplicationMaster进⾏任务管理;Container可以是Map资源也可以是Reduce算法,maptask资源分区排序好交给reducetask, reduce返回结果给job1ApplicationMaster,job1ApplicationMaster再汇报给ResourceManger, ResourceManger完成当前job1,撤掉任务领队和container;每⼀个DataNode上默认有⼀个NodeManager ,与主备ResourceManger保持⼼跳,监听 DataNode的情况。 MapReduce 的执⾏流程? ⾸先,原始数据file切分成块存放在HDFS上,因为数据存储到HDFS上不可变,有可能块的数量和 集群的计算能⼒不匹配,所以我们通过控制split切⽚⼤⼩来协调分配,⼀般⼀个切⽚对应⼀个 map,切⽚⼤⼩默认128M。Map阶段:默认每次从block中读取⼀⾏数据在内存中进⾏拆分计算,产⽣临时结果超出内存⼤⼩, 通过环形数据缓冲区存放⼊硬盘。在写出到硬盘前,要先进⾏分区(根据reduce)、快速排序(尽 可能减少空间占⽤)再溢写到硬盘。最后需要合并溢写产⽣的很多有序(分区 key)的⼩⽂件变成⼀个 有序的⼤⽂件。reduce阶段: 拉取:将Map阶段合并的数据块中的结果拉取到对应的Reduce; 合并:将从多个Map端拉取的数据合并到⼀起,⽅便操作;reduce操作:把相邻的数据取出来即可计算最终的结果。output:将最终结果写出到存储系统。 源码分析 split切片 总结:一个 Split 对应一个 Map默认 Split 分片大小是默认的 BLK 大小如果要调大切片,就设置 minSize。如果要调小切片,就设置 maxSize1.1 阈值设置是为了减少切片的数量从而提高MR程序计算的性能计算向数据靠拢,将我们的程序打成jar包上传至MR服务器去执行最终任务是运行在 YARN Mapper 总结:行读取器 LineRecordReader环形数据缓冲区a. 默认 100M b. 溢写阈值 0.8默认使用 Hash 分区默认使用快速排序程序员自定义的比较器,如果拿不到,默认的比较器 Reduce 总结: 1. copy 拉取阶段 2. sort 排序,map阶段是多个有序的文件,拉到 reduce 后变成内部有序外部无序的若干文件,排序 3. reduce 合并为一个文件输出 LineRecodWrite 写出 4. 程序员自定义的分组器,如果不到,程序员自定义的比较器,如果还不到,默认的比较器 天气案例练习 要对数据线进行分区,按照省,区,时间,温度,因为需求是求温度最高的前三个区+时间,有可能出现同一天温度是最高的情况,所以我们要在,到了,要拉取数据,他会,,因为我们在map的时候,但是这个比较器已经细分到了温度,到,所以迫使我去自定义一个分组器,这个分组器是前三个信息的,当然可以用set和map再进行处理,去解决同一天的问题,set有一点特殊。需要先把前三个追加,追加完后,把温度再添加,map利用键值对value的形式更加容易理解 为什么shuffle Hadoop的集群环境,大部分的map task和reduce task 是执行在不同的节点上的,那么reduce就要取得map的输出结果,一般就需要在不同的节点上去拉取;那么集群中运行的很多个Job,task的执行会对集群中网络资源消耗严重,虽说这种消耗是正常的,不可避免的,但是可以采取措施减少不必要的网络消耗,另一方面,每个节点内部,相对比于内存,磁盘IO对Job的完成时间影响较大。 zookeeper: 什么是Zookeeper?谈谈你对Zookeeper的认识? Zookeeper是一个分布式的,开放源代码的分布式应用程序协调服务。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。 Zookeeper的核心功能 Zookeeper提供了三个核心功能:文件系统、通知机制和集群管理机制。文件系统 Zookeeper存储数据的结构,类似于一个文件系统。每个节点称之为znode,买个znode都是类似于K-V的结构,每个节点的名字相当于key,每个节点中都保存了对应的数据,类似于key-value中的value。通知机制 当某个client监听某个节点时,当该节点发生变化时,zookeeper就会通知监听该节点的客户端,后续根据客户端的处理逻辑进行处理。集群管理机制 zookeeper本身是一个集群结构,有一个leader节点,负责写请求,多个follower节点负责相应读请求。并且在leader节点故障的时候,会根据选举机制从剩下的follower中选举出新的leader。 Zookeeper的工作原理 Zookeeper的核心是原子广播,这个机制保证了各个Server之前的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,分别是恢复模式(选主)和广播模式(同步)。当服务启动或者在leader崩溃后,Zab就进入了恢复模式,当leader被选举出来,且大多数Server完成了和leader的状态同步后,恢复模式就结束了。状态同步保证了leader和server具有相同的系统状态。之后进入广播模式,如果这个时候当一个server加入到Zookeeper服务中,它会在恢复模式下启动,发现leader,并和leader进行状态同步。等同步结束,它也参与消息广播。Zookeeper服务一直维持在Broadcast状态,直到leader崩溃或者leader失去了大部分followers的支持。 Zookeeper 的选举机制 超过半数节点的投票当选主节点; 任期内的节点序号⼤直接当选; 如果没有任期或任期相同,事务id⼤的节点选举成功; 如果事务id相同,服务器id⼤的节点选举成功; Hbase: Hbase是什么? (1) Hbase一个分布式的基于列式存储的数据库,基于Hadoop的hdfs存储,zookeeper进行管理。 (2) Hbase适合存储半结构化或非结构化数据,对于数据结构字段不够确定或者杂乱无章很难按一个概念去抽取的数据。 (3) Hbase为null的记录不会被存储. (4)基于的表包含rowkey,时间戳,和列族。新写入数据时,时间戳更新,同时可以查询到以前的版本. (5) hbase是主从架构。hmaster作为主节点,hregionserver作为从节点。 HBase 的特点是什么? 1)大:一个表可以有数十亿行,上百万列; 2)无模式:每行都有一个可排序的主键和任意多的列,列可以根据需要动态的增加,同一 张表中不同的行可以有截然不同的列; 3)面向列:面向列(族)的存储和权限控制,列(族)独立检索; 4)稀疏:空(null)列并不占用存储空间,表可以设计的非常稀疏; 5)数据多版本:每个单中的数据可以有多个版本,默认情况下版本号自动分配,是单 格插入时的时间戳; 6)数据类型单一:Hbase 中的数据都是字符串,没有类型。 HBase 和 Hive 的区别? Hive 和 Hbase 是两种基于 Hadoop 的不同技术–Hive 是一种类 SQL 的引擎,并且运行MapReduce 任务,Hbase 是一种在 Hadoop 之上的 NoSQL 的 Key/vale 数据库。当然,这两种工具是可以同时使用的。就像用 Google 来搜索,用 FaceBook 进行社交一样,Hive 可以用来进行统计查询,HBase 可以用来进行实时查询,数据也可以从 Hive 写到 Hbase,设置再从 Hbase 写回 Hive。 HBase 适用于怎样的情景? ① 半结构化或非结构化数据 ② 记录非常稀疏 ③ 多版本数据 ④ 超大数据量 描述 HBase 的 rowKey 的设计原则? ① Rowkey 长度原则 Rowkey 是一个二进制码流,Rowkey 的长度被很多开发者建议说设计在 10~100 个字节,不过建议是越短越好,不要超过 16 个字节。 原因如下: (1)数据的持久化文件 HFile 中是按照 KeyValue 存储的,如果 Rowkey 过长比如 100个字节,1000 万列数据光 Rowkey 就要占用 100*1000 万=10 亿个字节,将近 1G 数据,这会极大影响 HFile 的存储效率; (2)MemStore 将缓存部分数据到内存,如果 Rowkey 字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此 Rowkey 的字节长度越短越好。 (3)目前操作系统是都是 64 位系统,内存 8 字节对齐。控制在 16 个字节,8 字节 的整数倍利用操作系统的最佳特性。 ② Rowkey 散列原则 如果Rowkey 是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个Regionserver 实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有新数据都在一个 RegionServer 上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别 RegionServer,降低查询效率。 ③ Rowkey 唯一原则 必须在设计上保证其唯一性。 描述 HBase 中 scan 和 get 的功能以及实现的异同? HBase 的查询实现只提供两种方式: 1)按指定 RowKey 唯一一条记录,get 方法(org.apache.hadoop.hbase.client.Get)Get 的方法处理分两种 : 设置了 ClosestRowBefore 和没有设置 ClosestRowBefore 的rowlock。主要是用来保证行的事务性,即每个 get 是以一个 row 来标记的。一个 row 中可以有很多 family 和 column。 2)按指定的条件一批记录,scan 方法(org.apache.Hadoop.hbase.client.Scan)实现条件查询功能使用的就是 scan 方式。 请详细描述 HBase 中一个 cell 的结构? HBase 中通过 row 和 columns 确定的为一个存贮单称为 cell。 Cell:由{row key, column(=<family> + <label>), version}唯一确定的单。cell 中的数据是没有类型的,全部是字节码形式存贮。 简述 HBase 中 compact 用途是什么,什么时候触发,分为哪两种,有什么区别,有哪些相关配置参数?(☆☆☆☆☆) 在 hbase 中每当有 memstore 数据 flush 到磁盘之后,就形成一个 storefile,当 storeFile的数量达到一定程度后,就需要将 storefile 文件来进行 compaction 操作。 Compact 的作用: ① 合并文件 ② 清除过期,多余版本的数据 ③ 提高读写数据的效率 HBase 中实现了两种 compaction 的方式:minor and major. 这两种 compaction 方式的 区别是: 1、Minor 操作只用来做部分文件的合并操作以及包括 minVersion=0 并且设置 ttl 的过 期版本清理,不做任何删除数据、多版本数据的清理工作。 2、Major 操作是对 Region 下的 HStore 下的所有 StoreFile 执行合并操作,最终的结果 是整理合并出一个文件。 HBase 优化? (1)高可用 在 HBase 中 Hmaster 负责监控 RegionServer 的生命周期,均衡 RegionServer 的负载,如果 Hmaster 挂掉了,那么整个 HBase 集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以 HBase 支持对 Hmaster 的高可用配置。 (2)预分区 每一个 region 维护着 startRow 与 endRowKey,如果加入的数据符合某个 region 维护的rowKey 范围,则该数据交给这个 region 维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高 HBase 性能 . (3)RowKey 设计 一条数据的唯一标识就是 rowkey,那么这条数据存储于哪个分区,取决于 rowkey 处于哪个一个预分区的区间内,设计 rowkey 的主要目的 ,就是让数据均匀的分布于所有的 region中,在一定程度上防止数据倾斜。接下来我们就谈一谈 rowkey 常用的设计方案 (4)7.4 内存优化 HBase 操作过程中需要大量的内存开销,毕竟 Table 是可以缓存在内存中的,一般会分配整个可用内存的 70%给 HBase 的 Java 堆。但是不建议分配非常大的堆内存,因为 GC 过程持续太久会导致 RegionServer 处于长期不可用状态,一般 16~48G 内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。 (5)基础优化 Region 如何预建分区? 预分区的目的主要是在创建表的时候指定分区数,提前规划表有多个分区,以及每个分区的区间范围,这样在存储的时候 rowkey 按照分区的区间存储,可以避免 region 热点问题。 通常有两种方案: 方案 1:shell 方法 create ‘tb_splits’, {NAME => ‘cf’,VERSIONS=> 3},{SPLITS => [’10’,’20’,’30’]} 方案 2: JAVA 程序控制 · 取样,先随机生成一定数量的 rowkey,将取样数据按升序排序放到一个集合里; · 根据预分区的 region 个数,对整个集合平均分割,即是相关的 splitKeys; HBaseAdmin.createTable(HTableDescriptortableDescriptor,bytesplitkeys)可以指定预分区的 splitKey,即是指定 region 间的 rowkey 临界值。 HRegionServer 宕机如何处理? 1)ZooKeeper 会监控 HRegionServer 的上下线情况,当 ZK 发现某个 HRegionServer 宕机之后会通知 HMaster 进行失效备援; 2)该 HRegionServer 会停止对外提供服务,就是它所负责的 region 暂时停止对外提供服务; 3)HMaster 会将该 HRegionServer 所负责的 region 转移到其他 HRegionServer 上,并且会对 HRegionServer 上存在 memstore 中还未持久化到磁盘中的数据进行恢复; 4)这个恢复的工作是由 WAL 重播来完成,这个过程如下: · wal 实际上就是一个文件,存在/hbase/WAL/对应 RegionServer 路径下。 · 宕机发生时,读取该 RegionServer 所对应的路径下的 wal 文件,然后根据不同的region 切分成不同的临时文件 recover.edits。 · 当 region 被分配到新的 RegionServer 中,RegionServer 读取 region 时会进行是否存在 recover.edits,如果有则进行恢复。 HBase 读写流程?(☆☆☆☆☆) 读: ① HRegionServer 保存着 meta 表以及表数据,要访问表数据,首先 Client 先去访问zookeeper,从 zookeeper 里面 meta 表所在的位置信息,即找到这个 meta 表在哪个HRegionServer 上保存着。 ② 接着 Client 通过刚才到的 HRegionServer 的 IP 来访问 Meta 表所在的HRegionServer,从而读取到 Meta,进而到 Meta 表中存放的数据。 ③ Client 通过数据中存储的信息,访问对应的 HRegionServer,然后扫描所在HRegionServer 的 Memstore 和 Storefile 来查询数据。 ④ 最后 HRegionServer 把查询到的数据响应给 Client。 写: ① Client 先访问 zookeeper,找到 Meta 表,并 Meta 表数据。 ② 确定当前将要写入的数据所对应的 HRegion 和 HRegionServer 服务器。 ③ Client 向该 HRegionServer 服务器发起写入数据请求,然后 HRegionServer 收到请求 并响应。 ④ Client 先把数据写入到 HLog,以防止数据丢失。 ⑤ 然后将数据写入到 Memstore。 ⑥ 如果 HLog 和 Memstore 均写入成功,则这条数据写入成功 ⑦ 如果 Memstore 达到阈值,会把 Memstore 中的数据 flush 到 Storefile 中。 ⑧ 当 Storefile 越来越多,会触发 Compact 合并操作,把过多的 Storefile 合并成一个大 的 Storefile。 ⑨ 当 Storefile 越来越大,Region 也会越来越大,达到阈值后,会触发 Split 操作,将 Region 一分为二。 HBase 内部机制是什么? Hbase 是一个能适应联机业务的数据库系统物理存储:hbase 的持久化数据是将数据存储在 HDFS 上。 存储管理:一个表是划分为很多 region 的,这些 region 分布式地存放在很多 regionserver上 Region 内部还可以划分为 store,store 内部有 memstore 和 storefile。 版本管理:hbase 中的数据更新本质上是不断追加新的版本,通过 compact 操作来做版本间的文件合并 Region 的 split。 集群管理:ZooKeeper + HMaster + HRegionServer。 HBase 在进行模型设计时重点在什么地方?一张表中定义多少个 Column Family 最合适?为什么? Column Family 的个数具体看表的数据,一般来说划分标准是根据数据访问频度,如一张表里有些列访问相对频繁,而另一些列访问很少,这时可以把这张表划分成两个列族,分开存储,提高访问效率。 如何提高 HBase 客户端的读写性能?请举例说明(☆☆☆☆☆) 1 开启 bloomfilter 过滤器,开启 bloomfilter 比没开启要快 3、4 倍 2 Hbase 对于内存有特别的需求,在硬件允许的情况下配足够多的内存给它 3 通过修改 hbase-env.sh 中的 export HBASE_HEAPSIZE=3000 #这里默认为 1000m 4 增大 RPC 数量 通过修改 hbase-site.xml 中的 hbase.regionserver.handler.count 属性,可以适当的放大RPC 数量,默认值为 10 有点小。 直接将时间戳作为行健,在写入单个 region 时候会发生热点问题,为什么呢?(☆☆☆☆☆) region 中的 rowkey 是有序存储,若时间比较集中。就会存储到一个 region 中,这样一个 region 的数据变多,其它的 region 数据很少,加载数据就会很慢,直到 region 分裂,此问题才会得到缓解。 请描述如何解决 HBase 中 region 太小和 region 太大带来的冲突? Region 过大会发生多次compaction,将数据读一遍并重写一遍到 hdfs 上,占用io,region过小会造成多次 split,region 会下线,影响访问服务,最佳的解决方法是调整 hbase.hregion.max.filesize 为 256m。 解释一下布隆过滤器原理(☆☆☆☆☆)? 算法流程: 构建一个长度为 n 的数组,每个比特位初始化为 0 需要 k 个 hash 函数,每个函数可以把 key 散列为一个整数 插入m 个已知的 key,循环进行下面的操作 a. 分别用 k 个 hash 函数对key 进行散列 b. 将散列值对应的二进制位置为 1 查找 key 是否存在 a. 分别用 k 个 hash 函数对key 进行散列 b. 查看对应的二进制位是否都为 1 特点 只需要少量的存储空间(n) 通过布隆过滤器后也有一定的概率不存在这个 key 无法删除 判断key 是否 一定不存在 或 可能存在 Hive 你是怎么理解 Hive 这个组件的? hive是⼀个基于 Hadoop 的数仓⼯具,也就是对⼤数据进⾏分析处理的⼯具。 可以将结构化和半结构化的数据⽂件映射为⼀张数据库表:是为了进⾏hql,所以需要先将原始数据转换 成⼀个⼆维表形式,⽅便操作。 提供简单的 sql 查询功能: Hive的HQL表达能⼒有限 、Hive的效率⽐较低。 本质是将HiveSQL语句转化成MapReduce任务执⾏。 表层:hive是搭建数据库的数据库,是逻辑上的数据库,⼀些查询⽅式表现得像关系型数据库,所以业 内的技术⼈员就进⾏了观念上统⼀,将hive划分到了关系型数据库; 底层:将SQL转换为MapReduce的任务进⾏运算,底层由HDFS来提供数据存储; 分区表和分桶表 分区表和分桶表的作⽤都是为了加快查询的速度,不需要做全表查询。 分区表:借助物理⽂件夹分区,不会⼆次判定。它提供了⼀个隔离数据和优化查询的便利⽅式,不过并 ⾮所有的数据都可形成合理的分区,尤其是需要确定合适⼤⼩的分区划分⽅式(有的数据分区数据过⼤, 有的很少,即我们常说的数据倾斜)。 分桶表:将数据按照字段划分,分到多个⽂件中去。主要可以jion时提⾼效率,减少笛卡尔积数量,另 外对数据集的⼀部分进⾏抽样测试,提⾼抽样的效率。 hive产生数据倾斜的原因及解决办法? 空值引发的数据倾斜 在数据采集的时候,判断导致数据倾斜的key是不是提前过滤掉了,在inner join内连接的时候,hive默认过滤了空值,但是对于left join等会保留左边的值 1insert overwirte 先过滤掉空值,再进行join 2时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上。 没有shuffle阶段,减少了大量的网络传输;没有了reduce阶段,防止数据倾斜的发生; 原理:两阶段聚合指的是先局部聚合再全局聚合。局部聚合时候给每个key值加上随机前缀进行打散,原本相同的key值会变成不同的新key值,便可以让原来由一个task处理的数据根据加上随机前缀后的新key值分散到多个Task上做聚合,从而缓解单个task处理数据量过多的问题。再去除随机前缀做全局聚合,既可以得到最终结果 数据:读取的数据是压缩的不可分割的⼤⽂件只能放在⼀个节点上处理;—->多进⾏⼀次计算实际数据某些⽐例失衡(护⼠班计算男⼥⽐例);—>增加随机值数据中含有⼤量空值;—>过滤null、减法、为null分配随机数 算法:任务会处理⼤量相同建的数据(⽐如空值—>过滤null、减法、为null分配随机数);⽆法削减的中间结果的数据量(map阶段对数据进⾏预处理(预聚合combiner));多维聚合计算数据膨胀(两张hive表jion—>先把数据分⼩分细再均分;或者将倾斜的数据再进⾏ ⼀次操作)。 关于数据倾斜是我们经常会遇到的问题,如果遇到处理进程达到99%⼀直不动,就有可能出现倾斜了, 除了⼀些常⻅的处理⽅式之外,我们还需要更多的算法优化。 hive的Serde是什么 SerDe 是 Serializer/Deserializer 的简写。hive使用 SerDe 进行行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型 内外部表 内部表:即创建Hive内部表时,数据将真实存在于表所在的目录内,删除内部表时,物理数据和文件也一并删除 外部表:即新建表仅仅是指向一个外部目录而已。同样,删除时也并不物理删除外部目录,而仅仅是将引用和定义删除 什么时候使用内部表,什么时候使用外部表? 如果数据的所有处理都在Hive中进行,那么更倾向于选择内部表。 如果Hive和其他工具针对相同的数据集做处理,那么外部表更合适。 一般情况下,在企业内部都是使用外部表的。因为会有多人操作数据仓库,可能会产生数据表误删除操作,为了数据安全性,通常会使用外部表 一行变多行和多行变一行 先split切分然后explode爆炸开 先炸开然后:COLLECT_SET() 和 COLLECT_LIST() 可以将多行数据转成一行数据,区别就是 LIST 的素可重复而 SET 的素是 去重的。 spark spark任务提交命令 spark-submit提交 写脚本#!/bin/bash master yarn deploy-mode cluster 部署模式运行在driver进程在客户端还是集群 class com.yjxxt.etl.OdsApp2DwdApp 运行程序的类的全限定名 name 运行程序的名称 conf spark.sql.shuffle.partitions=5 运行程序需要指定配置信息 driver-memory 运行程序指定的driver的内存,默认为1G executor-memory 单个executor需要分配的内存大小,默认为1G driver-cores driver驱动程序的核数,默认为1,只有在部署模式为cluster的时候有效 queue default num-executors executor的个数,默认为2个executor https://zhuanlan.zhihu.com/job/yjxshopetl.jar node02 2021-01-01 2021-01-02 运行程序的jar包,一般放在hdfs上面 spark和Mr的区别 内存和磁盘的区别,job中途失败重新计算的区别 mr是多进程单线程,spark是多进程多线程 mr程序由多个独立task构成,spark是由多个独立的executer进程构建的临时资源池构成,一个executer分为多个task,有独立的mapshuffer,和reduceshuffer 总结,spark生态更为丰富,功能更为强大、性能更佳,适用范围更广;mapreduce更简单、稳定性好、适合离线海量数据挖掘计算 RDD持久化原理? RDD持久化原理:spark非常重要的一个功能特性就是可以将RDD持久化在内存中。调用cache()和persist()方法即可。cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清除缓存,可以使用unpersist()方法。RDD持久化是可以手动选择不同的策略的。在调用persist()时传入对应的StorageLevel即可。 checkpoint检查点机制? checkpoint检查点机制:应用场景:当spark应用程序特别复杂,从初始的RDD开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用checkpoint功能。原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据 Checkpoint首先会调用SparkContext的setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。 检查点机制是我们在spark streaming中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:控制发生失败时需要重算的状态数。Spark streaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。2. 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样spark streaming就可以读取之前运行的程序处理数据的进度,并从那里继续。 checkpoint和持久化机制的区别? checkpoint和持久化机制的区别:最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低 Sparkstreaming以及基本工作原理? Spark streaming以及基本工作原理:Spark streaming是spark core API的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。它支持从多种数据源读取数据,比如Kafka、Flume、Twitter和TCP Socket,并且能够使用算子比如map、reduce、join和window等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。Spark streaming内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成batch,比如每收集一秒的数据封装成一个batch,然后将每个batch交给spark的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的batch组成的。 spark解决了hadoop的哪些问题? spark解决了hadoop的哪些问题:MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手; Spark:Spark采用RDD计算模型,简单容易上手。MR:只提供map和reduce两个操作,表达能力欠缺; Spark:Spark采用更加丰富的算子模型,包括map、flatmap、groupbykey、reducebykey等;MR:一个job只能包含map和reduce两个阶段,复杂的任务需要包含很多个job,这些job之间的管理以来需要开发者自己进行管理; Spark:Spark中一个job可以包含多个转换操作,在调度时可以生成多个stage,而且如果多个map操作的分区不变,是可以放在同一个task里面去执行;MR:中间结果存放在hdfs中; Spark:Spark的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是hdfs;MR:只有等到所有的map task执行完毕后才能执行reduce task; Spark:Spark中分区相同的转换构成流水线在一个task中执行,分区不同的需要进行shuffle操作,被划分成不同的stage需要等待前面的stage执行完才能执行。MR:只适合batch批处理,时延高,对于交互式处理和实时处理支持不够; Spark:Spark streaming可以将流拆成时间间隔的batch进行处理,实时计算。 RDD中reduceBykey与groupByKey哪个性能好,为什么 reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好摸鱼 处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。所以在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还可以防止使用groupByKey造成的内存溢出问题。 Spark on Hive &Hive on Spark spark on hive :Hive 存储,spark优化,执行 Hive on Spark :Hive即作为存储又负责sql的解析优化,Spark负责执行 spark优化 指定资源分配的默认参数 spark conf下的spark-env.sh SPARK_WORKER_CORES SPARK_WORKER_MEMORY SPARK_WORKER_INSTANCES #每台机器启动worker数 提交Application时给当前Application分配更多的资源 自定义分区器,如果读取数据是在SparkStreaming中一般我们的操作就是使用repartition去增大sparksql查询出的 rdd的分区数。park.default.parallelism 500spark.sql.shuffle.partitions—200 避免创建重复的RDD 对多次使用的RDD进行持久化 持久化算子 cache: MEMORY_ONLY persist: MEMORY_ONLY MEMORY_ONLY_SER MEMORY_AND_DISK_SER 一般不要选择带有_2的持久化级别 尽量避免使用shuffle类的算子 使用广播变量来模拟使用join,使用情况:一个RDD比较大,一个RDD比较小。 join算子=广播变量+filter、广播变量+map、广播变量+flatMap 使用map-side预聚合的shuffle操作 即尽量使用有combiner的shuffle类算子。 combiner概念: 在map端,每一个map task计算完毕后进行的局部聚合。 combiner好处: 1) 降低shuffle write写磁盘的数据量。 2) 降低shuffle read拉取数据量的大小。 3) 降低reduce端聚合的次数。 有combiner的shuffle类算子: 1) reduceByKey:这个算子在map端是有combiner的,在一些场景中可以使用reduceByKey代替 groupByKey。 2) aggregateByKey 3) combinerByKey 尽量使用高性能的算子 使用reduceByKey替代groupByKey 使用mapPartition替代map 使用foreachPartition替代foreach filter后使用coalesce减少分区数 使用repartitionAndSortWithinPartitions替代repartition与sort类操作 cache: MEMORY_ONLY persist: MEMORY_ONLY MEMORY_ONLY_SER MEMORY_AND_DISK_SER 使用repartition和coalesce算子操作分区 使用广播变量 使用Kryo优化序列化性能 优化数据结构 java中有三种类型比较消耗内存: 1) 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。 2) 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。 3) 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合 素,比如Map.Entry。 因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据 结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类 型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。 使用高性能的库fastutil RACK_LOCAL,NO_PREF,NODE_LOCAL Spark中如何内存调优 1) 提高Executor总体内存的大小 2) 降低储存内存比例或者降低聚合内存比例 数仓 flume为什么要进行级联采集 实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有 数据丢失的问题 使得各个节点的数据可以进行一次汇总,能够溯源 Flume拦截器 自定义拦截器的步骤 ① 实现Interceptor拦截器的接口 ② 重写四个方法: initialize 初始化 public Event intercept(Event event) 处理单个Event public Listintercept(Listevents) 处理多个event方法,在这个方法中调用Event intercept(Event event) ③ close 方法 ④ 静态内部类,实现interceptor.Builder Flume的事务机制 Flume使用两个独立的事务分别负责从Source到Channel,及从Channel到Sink的事件传递。 比如Spooling Directory Source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到Channel且提交成功,那么Source就将该文件标记为完成。同理,事务以类似的方式处理从Channel到Sink的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚且所有的事件都会保持到Channel中,等待重新传递 Flume采集数据会丢失吗? 不会,Channel通道可以选择File Channel,数据传输自身有事务机制保证source到Channel是事务性的,Channel到Sink是事务性的 Flume不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复 Flume如何优化 ① Source 增加Source个数 ② Channel type 选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据。type选择file时Channel的容错性更好,但是性能上会比memory channel差。 ③ Sink 增加Sink的个数可以增加Sink消费event的能力Sink也不是越多越好够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费。 数据怎么采集到Kafka,实现方式 使用官方提供的Flume-Kafka插件,插件的实现方式是自定义了Flume的 Sink,将数据从Channel中取出,通过Kafka的producer写入到Kafka中,可以自定义分区等 Flume和Kafka采集日志区别,采集日志时中间停了,怎么记录之前的日志? ① Flume采集日志是通过流的方式直接将日志收集到存储层,而Kafka是将数据缓存在Kafka集群中,待后期可以采集到存储层。 ② Flume采集中间停了,可以采用文件的方式记录之前的日志,而Kafka是采用offset偏移量的方式记录之前的日志。 为什么有的公司要在Flume后面接上Kafka? Flume -> Kafka -> HDFS -> MR计算 Kafka作为消息中间件主要作用是解耦。因为在不同的系统之间的融合处往往数据生产速率和消费速率不相同,这时可以在这些系统之间加Kafka缓存。例如线上数据需要输入HDFS,线上数据生产快且具有突发性,如果直接连接上HDFS(kafka-consumer)可能会使得高峰时间hdfs数据写失败,这种情况你可以把数据先写到Kafka,然后从Kafka中导入到hdfs。 DataX读入数据的时候怎么实现高可用? 配置脚本 node02 namespace SecondaryNameNode等等 两个配置 hdfs set xml set core Flume开启第二次采集 需要先删除/usr/local的.json文件,记录的是flume采集的位置,如果不进行删除的话,会认为还在采集,无法采集第二次 重复数据写入同一分区怎么防止数据重复 可以使用insert overwrite 覆盖写入 非分区表怎么防止数据重复 非分区表代表着数据较少,可以先查出之前数据,再将今天的数据uinon进去也可以进行去重,但是试用的是小数据 datax怎么做并发 提升channel的数量1 DataXJob 根据分库分表切分成了 100 个 Task。2 根据 20 个并发,默认单个任务组的并发数量为 5,DataX 计算共需要分配 4 个 TaskGroup。3 这里 4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以 5 个并发共计运行 25 个 Task。 怎么解决零点漂移 这种情况下,可以通过拦截器在flume事件头指定timestamp作为文件的创建依据。 通常是将日志中记录的日志创建时间提取出来,写入flume事件头的字段,有了这个字段,flume创建文件时,会依据这个字段创建文件,这种场景很类似spark、flink的事件事件和处理事件 datax增量数据怎么更新 datax的job是以一个json文件来描述的,本身提供了where条件,支持简单的增量更新。一旦我们的job中,增量抽取的数据比较复杂,比如,本身sql中需要多表关联或者有多个子查询,此时where条件已经无法满足。对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。也就是说,querySql适用于复杂情况下,配置更灵活,所以,一般的增量更新,我们都会采取querySql来进行。表示这个job是否是增量的is_increment配置项是由我们的java代码来读取解析的。 在改造完增量更新之后,我们需要在datax执行任务之前去调用我们的java代码,以便实现增量与全量的判断,以及增量更新的动态修改json文件中的占位符 java jvm内存结构 JVM的内存结构大致分为五个部分,分别是程序计数器、虚拟机栈、本地方法栈、堆和方法区。除此之外,还有由堆中引用的JVM外的直接内存。 :用于记录下一条JVM指令的执行地址 :每个线程运行时所需要的内存 :本地方法栈为本地方法服务 :所有对象都在这里分配内存,是垃圾收集的主要区域 :用于存放已被加载的类信息、常量、静态变量、即时编译器编译后的代码等数据 数仓怎么设计的 合理的数据仓库分层一方面能够降低耦合性,提高重用性,可读性可维护性,另一方面也能提高运算的效率,影响到数据需求迭代的速度,近而影响到产品决策的及时性。建立数据分层可以提炼公共层,避免烟囱式开发,可见一个合适且合理的数仓分层是极其重要 ODS层存在的意义 数据引入层(ODS,Operational Data Store),又称数据基础层。我们将原始数据几乎无处理地存放在数据仓库系统中,结构上与源系统基本保持一致,是数据仓库的数据准备区。这一层的主要职责是将基础数据同步、存储。可以溯源,如果有些数据出现错误,可以从ods直接,方便,是清洗过的不需要再清洗一遍 DWD和DIM怎么设计的,有什么指标 高基数维度数据:一般是用户资料表、商品资料表类似的资料表。数据量可能是千万级或者上亿级别。 低基数维度数据:一般是配置表,比如枚举值对应的中文含义,或者日期维表。数据量可能是个位数或者几千几万。 这一层主要解决一些数据质量问题和数据的完整度问题。比如用户的资料信息来自于很多不同表,而且经常出现延迟丢数据等问题,为了方便各个使用方更好的使用数据,我们可以在这一层做一个屏蔽。(汇总多个表) DWS层存放的哪些指标 登录次数,访问次数,添加购物车次数,够买商品数,购买不同商品数,购买金额,订单次数,退货次数等 kafka 说一下Kafka 是什么,在大数据开发中充当什么样的角色 Kafka 是一个分布式流式处理平台,具有高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性。作用一:消息系统。具备冗余存储、缓冲、异步通信、扩展性、可恢复性等功能。作用二:存储系统:Kafka有消息持久化和多副本机制。将消息持久化到磁盘,可以把它作为长期的数据存储系统来使用作用三:流式处理平台。Kafka 可以和流式处理框架进行集成。比如像Spark Streaming和Flink。提供了窗口、连接、变换和聚合等各类操作。 Kafka 吞吐量高,为什么 零拷贝:避免了传统IO四步操作,采用DMA 技术,用DMA引擎直接将数据从内核模式传递到网卡设备中页缓存:将磁盘的数据缓存到内存中,将对磁盘的访问变成对内存的访问顺序追加:消息落到磁盘中,采用顺序追加,不支持随机访问分区机制:partition ,实现横向扩展 Kafka 如何保证消息的有序性 一个分区,消费者将消息全部写入一个分区中,一个消费者进行消费。 被字节三面面试官怼死了自定义分区器Partitioner ,重写partition 方法,将消息顺序追加到K个分区,然后在消费者写K个内存队列,相同分区号的数据都存到一个内存Queue中,N个线程分别消费一个内存队列即可 说一下Kafka 的ACK 机制,0,1,-1 分别代表着什么意思 ACK=0 表示生产者在成功写入消息之前不会等待任何来自服务器的响应.ACK=1 表示只要集群的leader分区副本接收到了消息,就会向生产者发送一个成功响应的ack,此时生产者接收到ack之后就可以认为该消息是写入成功的.ACK=-1 表示只有所有参与复制的节点(ISR列表的副本)全部收到消息时,生产者才会接收到来自服务器的响应. Kafka message 的格式讲一下 crc32 循环冗余检验值attributes:一个字节,低三位表示压缩类型。其余位保留key length + keyvalue length + value Kafka 存储topic 的话你了解过在机器上的存储路径格式吗?换句话说:Kafka 文件目录布局给我说一下! 顺丰没答上来直接G 一个主题会有多个分区,那么就会有多个topic-partition 的文件夹。每个分区的日志会切分为多个LogSegment。每个LogSegment 的.log 日志文件都会有两个对应的索引文件。偏移量索引文件(.index 为后缀)和时间戳索引文件(以.timeindex为后缀的文件)。 Kafka 消费者和分区是如何对应的?如果消费者个数比分区数多会出现什么情况? 分区数大于或者等于组中的消费者实例数,那自然没有什么问题,无非一个消费者会负责多个分区 消费者大于分区数的话。这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异 Kafka 零拷贝,详细过程,传统IO四部操作的过程。零拷贝会经过JVM堆吗? 将数据直接从磁盘文件复制到网卡设备中,不需要经由应用程序之手。减少了内核和用户模式的上下文切换。底层通过sendfile 方法实现。传统IO需要四步。读两步:磁盘到Read Buffer,读缓冲区到用于程序。写两步:应用程序写数据到写缓冲区Socket Buffer,写缓冲区写到网卡设备中。零拷贝技术通过DMA技术将文件内容复制到内核模式的Read Buffer中,和传统IO不同的是,不需要再到用户态走一圈,不再需要额外的Socket Buffer。DMA engine直接将数据从内核模式中传递到网卡设备中。应用程序空间,用户态。应用程序存放数据就是在堆咯,所以,不会经过JVM堆 Kafka 能不能用MySQL 替代? Kafka 有哪几种选举策略? 数据倾斜 你遇到过Spark 的数据倾斜吗你遇到过Hive 的数据倾斜吗你遇到过Redis 的数据倾斜吗你遇到过 Kafka 的数据倾斜吗你在项目中遇到过数据倾斜吗你知道怎么解决数据倾斜吗 无论是哪个组件,在发生消息堆积的时候,也就是大量message 被发送到了一个地方,那我们来拆题: Kafka 数据倾斜:大量数据被发送到了Kafka 中一个partition Spark 数据倾斜:大量数据被发送到了Spark 的一个task Hive 数据倾斜:大量数据被发送到了一个Reduce 任务中 Redis 数据倾斜:大量数据都存到了Redis 集群中的一个节点中 Kafka 数据倾斜解决思路: 自定义分区器Partitioner。 Redis 集群数据倾斜解决方案: 重新进行槽指派 Hive 数据倾斜解决方案 聚合倾斜 Join 倾斜 Spark 数据倾斜解决方案 1.广播变量 场景:对RDD进行join 类操作。A join B。且B的RDD比较小(百兆或者1~2GB)的情况下。 解决思路:对较小的RDD直接collect到内存并创建广播变量。对另一方执行map 类算子。也就是A RDD去和广播变量中的每条数据依次对比key,key相同的两条进行join。 效果:用广播变量 + map 代替join。规避join带来的shuffle。 2.聚合倾斜 场景:对RDD进行reduceByKey 等聚合类shuffle 算子,还有SparkSQL做分组聚合时。部分key 嗷嗷多,导致少数节点OOM,或已经完成的节点都在等这个还在做的节点。 解决思路: 通过map 算子给操作的key打上n以内的随机数,举个例子(hello, 1) (hello, 1) (hello, 1) (hello, 1)变为(1_hello, 1) (1_hello, 1) (2_hello, 1),并进行reduceByKey的局部聚合。然后再次调用map 算子将key 的前缀随机数去掉,再次进行全局聚合。 效果: 将原本一个task 处理的数据分摊到多个task 进行局部聚合。规避了单个task 数据量大。
2024最新激活全家桶教程,稳定运行到2099年,请移步至置顶文章:https://sigusoft.com/99576.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。 文章由激活谷谷主-小谷整理,转载请注明出处:https://sigusoft.com/54782.html