自信人生两百年,会当水击三千里! var _hmt = _hmt || []; (function() { var hm = document.createElement("script"); hm.src = "//hm.baidu.com/hm.js?173a39c6aa58fb733b6adcd5d201c30a"; var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(hm, s); })();
  • 《腾讯在Spark上的应用与实践优化》原文参见:http://download.csdn.net/detail/happytofly/8637461

     
    TDW: Tencent Distributed Data Warehouse,腾讯分布式数据仓库;
    GAIA:腾讯自研的基于YARN定制化和优化的资源管理系统;
    Lhoste:腾讯自研的作业的工作流调度系统,类似于Oozie;
    TDW集群规模:
    1. Gaia集群节点数:8000+;
    2. HDFS的存储空间:150PB+;
    3. 每天新增数据:1PB+;
    4. 每天任务数:1M+;
    5. 每天计算量:10PB+;
    Spark集群:
    1. Spark部署在Gaia之上,即是Spark on YARN模式,每个节点是 24 cores 和 60G 内存;
    2. 底层存储包括:HDFS、HBase、Hive、MySQL;
    3. 作业类型,包括:ETL、SparkSQL、Machine Learning、Graph Compute、Streaming;
    4. 每天任务数,10K+;
    5. 腾讯从2013年开始引入Spark 0.6,已经使用2年了;
    Spark的典型应用:
    1. 预测用户的广告点击概率;
    2. 计算两个好友间的共同好友数;
    3. 用于ETL的SparkSQL和DAG任务;
    Case 1: 预测用户的广告点击概率
    1. 数据是通过DCT(Data Collect Tool)推送到HDFS上,然后Spark直接将HDFS数据导入到 RDD&Cache;
    2. 60次迭代计算的时间为10~15分钟,即每次迭代10~15秒;
    Case 2: 计算两个好友间的共同好友数
    1. 根据shuffle数量来确定partition数量;
    2. 尽量使用sort-based shuffle,减少reduce的内存使用;
    3. 当连接超时后选择重试来减少executor丢失的概率;
    4. 避免executor被YARN给kill掉,设置 spark.yarn.executor.memoryoverhead
    5. 执行语句 INSERT TABLE test_result SELECT t3.d, COUNT(*) FROM( SELECT DISTINCT a, b FROM join_1 ) t1 JOIN (SELECT DISTINCT b, c FROM join_2 ) t2 ON (t1.a = t2.c) JOIN (SELECT DISTINCT c, d FROM c, d FROM join_3 ) t3 ON (t2.b = t3.d) GROUP BY t3.d 使用Hive需要30分钟,使用SparkSQL需要5分钟;
    6. 当有小表时使用broadcase join代替Common join;
    7. 尽量使用ReduceByKey代替GroupByKey;
    8. 设置spark.serializer = org.apache.spark.serializer.KryoSerializer;
    9. 使用YARN时,设置spark.shuffle.service.enabled = true;
    10. 在早期版本中Spark通过启动参数固定executor的数量,当前支持动态资源扩缩容特性
      • spark.dynamicAllocation.enabled = true
      • spark.dynamicAllocation.executorIdleTimeout = 120
      • spark.dynamicAllocation.schedulerBacklogTimeout = 10
      • spark.dynamicAllocation.minExecutors/maxExecutors
    11. 当申请固定的executors时且task数大于executor数时,存在着资源的空闲状态。
    <完>
  • 2015年7月6日,Apache Hadoop的稳定版本 2.7.1 正式发布。
     
    Hadoop 2.7的一个小版本发布了,本版本属于稳定版本。
    修复了2.7.0中存在的131个bug。
    这是2.7.x第一个稳定版本,增强的功能列表请通过2.7.0版本部分查看。
    按着计划,下一个2.7.x的小版本是2.7.2.
     
    原文:

    06 July, 2015: Release 2.7.1 (stable) available

    A point release for the 2.7 line. This release is now considered stable.

    Please see the Hadoop 2.7.1 Release Notes for the list of 131 bug fixes and patches since the previous release 2.7.0. Please look at the 2.7.0 section below for the list of enhancements enabled by this first stable release of 2.7.x.

  • Cloudera发布了Kafka的好文,《Deploying Apache Kafka: A Practical FAQ》,参见:http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq

     
    是否应当为Kafka Broker使用 固态硬盘 (SSD)
    实际上使用SSD盘并不能显著地改善 Kafka 的性能,主要有两个原因:
    • Kafka写磁盘是异步的,不是同步的。就是说,除了启动、停止之外,Kafka的任何操作都不会去等待磁盘同步(sync)完成;而磁盘同步(disk syncs)总是在后台完成的。这就是为什么Kafka消息至少复制到三个副本是至关重要的,因为一旦单个副本崩溃,这个副本就会丢失数据无法同步写到磁盘。
    • 每一个Kafka Partition被存储为一个串行的WAL(Write Ahead Log)日志文件。因此,除了极少数的数据查询,Kafka中的磁盘读写都是串行的。现代的操作系统已经对串行读写做了大量的优化工作。
    如何对Kafka Broker上持久化的数据进行加密 
    目前,Kafka不提供任何机制对Broker上持久化的数据进行加密。用户可以自己对写入到Kafka的数据进行加密,即是,生产者(Producers)在写Kafka之前加密数据,消费者(Consumers)能解密收到的消息。这就要求生产者(Producers)把加密协议(protocols)和密钥(keys)分享给消费者(Consumers)。
    另外一种选择,就是使用软件提供的文件系统级别的加密,例如Cloudera Navigator Encrypt。Cloudera Navigator Encrypt是Cloudera企业版(Cloudera Enterprise)的一部分,在应用程序和文件系统之间提供了一个透明的加密层。
    Apache Zookeeper正成为Kafka集群的一个痛点(pain point),真的吗?
    Kafka高级消费者(high-level consumer)的早期版本(0.8.1或更早)使用Zookeeper来维护读的偏移量(offsets,主要是Topic的每个Partition的读偏移量)。如果有大量生产者(consumers)同时从Kafka中读数据,对Kafka的读写负载可能就会超出它的容量,Zookeeper就变成一个瓶颈(bottleneck)。当然,这仅仅出现在一些很极端的案例中(extreme cases),即有成百上千个消费者(consumers)在使用同一个Zookeeper集群来管理偏移量(offset)。
    不过,这个问题已经在Kafka当前的版本(0.8.2)中解决。从版本0.8.2开始,高级消费者(high-level consumer)能够使用Kafka自己来管理偏移量(offsets)。本质上讲,它使用一个单独的Kafka Topic来管理最近的读偏移量(read offsets),因此偏移量管理(offset management)不再要求Zookeeper必须存在。然后,用户将不得不面临选择是用Kafka还是Zookeeper来管理偏移量(offsets),由消费者(consumer)配置参数 offsets.storage 决定。
    Cloudera强烈推荐使用Kafka来存储偏移量。当然,为了保证向后兼容性,你可以继续选择使用Zookeeper存储偏移量。(例如,你可能有一个监控平台需要从Zookeeper中读取偏移量信息。) 假如你不得不使用Zookeeper进行偏移量(offset)管理,我们推荐你为Kafka集群使用一个专用的Zookeeper集群。假如一个专用的Zookeeper集群仍然有性能瓶颈,你依然可以通过在Zookeeper节点上使用固态硬盘(SSD)来解决问题。
    Kafka是否支持跨数据中心的可用性
    Kafka跨数据中心可用性的推荐解决方案是使用MirrorMaker。在你的每一个数据中心都搭建一个Kafka集群,在Kafka集群之间使用MirrorMaker来完成近实时的数据复制。
    使用MirrorMaker的架构模式是为每一个"逻辑"的topic在每一个数据中心创建一个topic:例如,在逻辑上你有一个"clicks"的topic,那么你实际上有"DC1.clicks"和“DC2.clicks”两个topic(DC1和DC2指得是你的数据中心)。DC1向DC1.clicks中写数据,DC2向DC2.clicks中写数据。MirrorMaker将复制所有的DC1 topics到DC2,并且复制所有的DC2 topics到DC1。现在每个DC上的应用程序都能够访问写入到两个DC的事件。这个应用程序能够合并信息和处理相应的冲突。
    另一种更复杂的模式是在每一个DC都搭建本地和聚合Kafka集群。这个模式已经被Linkedin使用,Linkedin Kafka运维团队已经在 这篇Blog 中有详细的描述(参见“Tiers and Aggregation”)。
    Kafka支持哪些类型的数据转换(data transformation)
    数据流过的Kafka的时候,Kafka并不能进行数据转换。为了处理数据转换,我们推荐如下方法:
    • 对于简单事件处理,使用 Flume Kafka integration,并且写一个简单的Apache Flume Interceptor。
    • 对于复杂(事件)处理,使用Apache Spark Streaming从Kafka中读数据和处理数据。
    在这两种情况下,被转换或者处理的数据可被写会到新的Kafka Topic中,或者直接传送到数据的最终消费者(Consumer)那里。
    对于实时事件处理模式更全面的描述,看看 这篇文章
    如何通过Kafka发送大消息或者超大负荷量?
    Cloudera的性能测试表明Kafka达到最大吞吐量的消息大小为10K左右。更大的消息将导致吞吐量下降。然后,在一些情况下,用户需要发送比10K大的多的消息。
    如果消息负荷大小是每100s处理MB级别,我们推荐探索以下选择:
    • 如果可以使用共享存储(HDFS、S3、NAS),那么将超负载放在共享存储上,仅用Kafka发送负载数据位置的消息。
    • 对于大消息,在写入Kafka之前将消息拆分成更小的部分,使用消息Key确保所有的拆分部分都写入到同一个partition中,以便于它们能被同一个消息着(Consumer)消费的到,在消费的时候将拆分部分重新组装成一个大消息。
    在通过Kafka发送大消息时,请记住以下几点:
    压缩配置
    • Kafka生产者(Producers)能够压缩消息。通过配置参数compression.codec确保压缩已经开启。有效的选项为"gzip"和"snappy"。
    Broker配置
    • message.max.bytes (default: 1000000): Broker能够接受的最大消息。增加这个值以便于匹配你的最大消息。
    • log.segment.bytes (default: 1GB): Kafka数据文件的大小。确保它至少大于一条消息。默认情况下已经够用,一般最大的消息不会超过1G大小。
    • replica.fetch.max.bytes (default: 1MB): Broker间复制的最大的数据大小。这个值必须大于message.max.bytes,否则一个Broker接受到消息但是会复制失败,从而导致潜在的数据丢失。
    Consumer配置
    • fetch.message.max.bytes (default: 1MB): Consumer所读消息的最大大小。这个值应该大于或者等于Broker配置的message.max.bytes的值。
    其他方面的考虑:
    • Broker需要针对复制为每一个partition分配一个replica.fetch.max.bytes大小的缓存区。需要计算确认( partition的数量 * 最大消息的大小 )不会超过可用的内存,否则就会引发OOMs(内存溢出异常)。
    • Consumers有同样的问题,因子参数为 fetch.message.max.bytes :确认每一个partition的消费者针对最大的消息有足够可用的内存。
    • 大消息可能引发更长时间的垃圾回收停顿(garbage collection pauses)(brokers需要申请更大块的内存)。注意观察GC日志和服务器日志。假如发现长时间的GC停顿导致Kafka丢失了Zookeeper session,你可能需要为zookeeper.session.timeout.ms配置更长的timeout值。
    Kafka是否支持MQTT或JMS协议
    目前,Kafka针对上述协议不提供直接支持。但是,用户可以自己编写Adaptors从MQTT或者JMS中读取数据,然后写入到Kafka中。
     
     
     
     
     
  • 1 Big Data Solution


    1.1 HP

    HP BigData 1.png

    HP BigData 2.png


    1.2 Oracle

    Oracle BigData 101.png

    Oracle BigData 1.JPG


    1.3 IBM

    IBM BigData 102.jpg