Coder Social home page Coder Social logo

articles's People

Contributors

knight-wu avatar

Stargazers

 avatar  avatar

Watchers

 avatar

Forkers

linecomparison

articles's Issues

spark

spark执行的大致流程

To summarize, the following phases occur during Spark execution:

  1. User code defines a DAG (directed acyclic graph) of RDDs
    Operations on RDDs create new RDDs that refer back to their parents, thereby
    creating a graph.
  2. Actions force translation of the DAG to an execution plan
    When you call an action on an RDD it must be computed. This requires computing
    its parent RDDs as well. Spark’s scheduler submits a job to compute all needed
    RDDs. That job will have one or more stages, which are parallel waves of
    computation composed of tasks. Each stage will correspond to one or more RDDs in
    the DAG. A single stage can correspond to multiple RDDs due to pipelining.
    Tasks are scheduled and executed on a cluster
  3. Stages are processed in order, with individual tasks launching to compute segments
    of the RDD. Once the final stage is finished in a job, the action is complete.

image

task、partition关系

  1. stage里面的task = 当前RDD其依赖或上一次的RDD partition,若是从file生成的RDD依赖指定的partition数量
  2. task由scheduler 指定partition所在的node去执行, 等于说哪个节点保存这个partition, 由这个节点去计算task.

The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: the coalesce transformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian(笛卡尔) creates an RDD with their product.

RDDs produced by textFile or hadoopFile have their partitions determined by the underlying MapReduce InputFormat that's used. Typically there will be a partition for each HDFS block being read. Partitions for RDDs produced by parallelize come from the parameter given by the user, or spark.default.parallelism if none is given.

  • To determine the number of partitions in an RDD
    rdd.partitions().size().

lineage(血统)

each RDD has a set of partitions, which
are atomic pieces of the dataset; a set of dependencies on
parent RDDs, which capture its lineage; a function for
computing the RDD based on its parents; and metadata
about its partitioning scheme and data placement.

  • dependencies between RDDs

    • narrow dependencies, where
      each partition of the child RDD depends on a constant
      number of partitions of the parent (not proportional(比例) to its
      size)
    • wide dependencies, where each partition of the
      child can depend on data from all partitions of the parent.
      For example, map leads to a narrow dependency, while
      join leads to to wide dependencies (unless the parents are
      hash-partitioned)
  • stage

  1. 从narrow到wide dependency的时候需要产生新的stage
  2. 产生action动作的时候会产生新的stage

Each stage contains as many pipelined transformations
with narrow dependencies as possible. The
boundaries of the stages are the shuffle operations required
for wide dependencies wide dependency or any cached partitions
that can short-circuit the computation of a parent RDD.

  • fault-tolerant

    • narrow dependency

    lost partition can be recomputed in parallel on other nodes

    • wide dependency

    node failure in the cluster may result in the loss
    of some slice of data from each parent RDD, requiring
    a full recomputation

    • checkpoint

    可以使用persist() 保存一个checkpoint, 不需要从血统的起点开始计算

  • lineage与DAG的区别

lineage 描述的是RDD的依赖关系, 依赖链, 如图1; 而DAG 是有向无环图, 节点是rdd, 边是rdd的转化关系, 并能区分stage, 如图2

Above figure depicts an RDD graph, which is the result of the following series of transformations:

val r00 = sc.parallelize(0 to 9)

val r01 = sc.parallelize(0 to 90 by 10)

val r10 = r00 cartesian df01

val r11 = r00.map(n => (n, n))

val r12 = r00 zip df01

val r13 = r01.keyBy(_ / 20)

val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)

图1
图2

RDD (Resilient Distributed Dataset)

  1. 在内存中计算, 内存中放不下遵循LRU(最近最少使用算法)将其余置换到disk
  2. 懒计算, 直到执行action 操作, 才会去计算RDD
  3. 容错性

使用lineage (血统) 可以在其他节点并行计算failed partition of RDD, 如果有备份则可以直接计算,更快; 否则要根据上次计算的结果重新计算.
若driver故障, 则所有executor的计算结果都会丢失
设置replication, 参考 RDD Persistence , 使用这个配置: MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

  1. 不可变性(Immutability)

一旦创建了rdd, 就是不能修改的, 除非生成新的 rdd, 避免了并发计算的问题, 而且每次 rdd transformation是确定的

rdd.toDebugString() 可以打印出rdd 的生成链

  • rdd, dataframe, dataset的区别
    参考 infoQ

rdd属于最底层的抽象, 提供最复杂的操作; dataFrame, dataSet提供某些场景下定制化的api, 使代码更加明确和易用, 也提供了性能上的优化, 适用于半结构化的数据, 例如json; 而dataSet 可以作为有类型的抽象, 将数据映射到java对象上, 对语法错误和解析错误提供编译时检查(说白了可以在编译时检查数据类型), dataFrame可以当做是dataSet 是无类型的

spark persist

参考自 https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md

  • checkPoint

分为reliable 和local 两种.

  1. reliable

SparkContext.setCheckpointDir(directory: String) to set the checkpoint directory, 目录必须是hdfs路径, 因为 checkPoint file实际上是保存在executor 机器上的.

用法: RDD.checkpoint() , 当前rdd被保存, 对 parent rdd的引用都会被移除; 每个job执行完之后, 会往前回溯所有的RDD, 若需要checkpoint, 则标记为 CheckpointingInProgress, 最后启动一个新的job 完成Checkpoint.
job完成 checkpoint之后, 会将rdd的所有 dependency释放掉, 设置该rdd的状态为 checkpoint, 并为该rdd 设置一个强依赖, 设置该rdd的 parent rdd为 CheckpointRDD, 该 CheckpointRDD 负责读取文件系统的 Checkpoint文件, 生成对应rdd 的 partition.

checkpoint 也是lazy的, 触发了action之后, 才会往前回溯到需要checkpoint的RDD 进行checkpoint.
checkpoint的时候, 会重新起一个新的job 去计算需要checkpoint 的rdd, 所以一般在checkpoint 之前先执行 cache, 则后续的checkpoint 的过程就直接把内存中的数据持久化到硬盘中了, 省去了重复计算.

  • cache

将中间数据存储在内存中, 以便其他job使用. 当task计算某个rdd的partition, 如果该partition需要cache, 则计算后存到内存.

  • cache和checkpoint的区别

cache之后, 整个lineage 还会保留, 但是cache的数据不能在多个driver program之间共享; 但是 checkpoint 之后,会把 lineage全部删除, 因为是持久化到 hdfs的, 可以供其他job使用;

  • 与mr的checkpoint的区别

Hadoop MapReduce 在执行 job 的时候,不停地做持久化,每个 task 运行结束做一次,每个 job 运行结束做一次(写到 HDFS)。在 task 运行过程中也不停地在内存和磁盘间 swap 来 swap 去。 可是讽刺的是,Hadoop 中的 task 太傻,中途出错需要完全重新运行,比如 shuffle 了一半的数据存放到了磁盘,下次重新运行时仍然要重新 shuffle。Spark 好的一点在于尽量不去持久化,所以使用 pipeline,cache 等机制。用户如果感觉 job 可能会出错可以手动去 checkpoint 一些 critical 的 RDD,job 如果出错,下次运行时直接从 checkpoint 中读取数据。唯一不足的是,checkpoint 需要两次运行 job

spark shuffle

参考自 SparkInternals-shuffleDetails

  • 简而言之, 是再次分布数据的过程.例如 reduceByKey(), 需要在所有的分区找到所有的key的所有value ,并把所有的value聚合到一起计算.

  • 具体流程, 如下图所示
    image

shuffle会产生两个stage, 分别对应 shuffle write和shuffle read
shuffle write: 可以当做mapper阶段, 第一个stage 中每个task中的记录, 通过 partitioner.partition(record.getKey())) (默认是HashPartitioner), 会被分散到 bucket上, 每个task 对应的bucket的数量 == reducer的数量 == 下一个stage的task的数量
shuffle read: 可以当做reducer阶段, 第二个stage 中的task, 根据task的id 和mapper的id从远端或者本地的 bucket上面 fetch 记录进行reduce.

  • reducer端如何进行fetch

若是reduceByKey, 则只需要持有一个hashMap, key为record的key, val则按照 record的次数进行更新; 但若是groupByKey, 需要

  • 性能调优

    • 提升shuffle的内存占比, 尽量避免shuffle的时候数据因为内存不够被刷写到磁盘中.

    spark.shuffle.memoryFraction , shuffle可以使用executor的内存占比, 默认0.2 , 可适当提高该比例

    • 提高shuffle操作的并行度
    • spark.sql.shuffle.partitions 提高sparkSql中shuffle类操作的并行度, 默认是200, 对应200个shuffle read tasks
  • 这篇博文还需再看 link

spark 资源分配

  • executor core
    --executor-cores in shell(or in conf spark.executor.cores) 5 means that each executor can run a maximum of five tasks at the same time。

    A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number

  • executor memory
    the heap size can be controlled with the --executor-memory flag or the spark.executor.memory property
  • spark.yarn.executor.memoryOverhead
    指的是 off-heap memory per executor, 用来存储 VM overheads, interned strings, other native overheads, 默认值是 Max(384MB, 10% of spark.executor-memory), 所以每个executor的实际物理内存需要囊括spark.yarn.executor.memoryOverhead 和executor memory两部分.
  • spark memory model
    link
  • num of executor
    --num-executors command-line flag or spark.executor.instances configuration property control the number of executors requested

    Starting in CDH 5.4/Spark 1.3, you will be able to avoid setting this property by turning on dynamic allocation with the spark.dynamicAllocation.enabled property. Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.

  • yarn.nodemanager.resource.memory-mb
    controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores
    controls the maximum sum of cores used by the containers on each node.

YARN may round the requested memory up a little. YARN’s yarn.scheduler.minimum-allocation-mb and yarn.scheduler.increment-allocation-mb properties control the minimum and increment request values respectively.

  • 跑批任务中(hive on spark, hive on mr)的资源分配

spark.executor.instance(实例数), spark. executor.memory(单个实例内存), 这两个参数可以根据实际情况进行相应调整。当处理数据量较大逻辑较为简单时可以增大实例数减小内存,提高批处理的并行数;当处理数据量不大但是逻辑较为复杂的任务时可以提高单个实例的内存,减小实例数,提升每个实例的处理能力。

  • spark 的并行程度

推荐 executor instances * core per executor 的两到三倍 = num of task , 避免cpu浪费(某些task 已经跑完了, 可以跑剩下的task)

  • 如何设置并行度
  1. spark sql

spark.sql.shuffle.partitions=[num_tasks]

  1. spark.defalut.parallelism , 只在shuffle的时候才生效,
val rdd2 = rdd1.reduceByKey()  //若设置 spark.defalut.parallelism=10, rdd2的分区数就是10
  1. 源数据在hdfs上面,
sparkContext.textFile(String filePath,int minPartition)
//  minPartition 决定了这个文件被分成几片, 
split size  = 总文件大小/ minPartition,
actual split size = Math.max(mapred.min.split.size,Math.min(split size,file block size(默认128MB)))
再为每个split 创建一个task
  1. 可以repartition
  2. val rdd3 = rdd1.join(rdd2)

rdd3里面partiiton的数量是由父RDD中最多的partition数量来决定,因此使用join算子的时候,增加父RDD中partition的数量

spark 性能调优

美团点评spark基础篇

  1. 避免创建重复RDD
    2.尽量重复使用RDD
    3.对多次使用的RDD持久化
    4.尽量避免shuffle类算子
    5.使用map端预聚合的算子, 类似于MR的combiner
    6.使用高性能算子

数据倾斜

某个parttion的大小远大于其他parttion,stage执行的时间取决于task(parttion)中最慢的那个,导致某个stage执行过慢

  • 情形暂定为两种
    • 相同的key的数据量太大
    • 不同的key在同一个parttion的数据量太大

由于同一个Stage内的所有Task执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同Task之间耗时的差异主要由该Task所处理的数据量决定。

  • 提升并行程度,
  • 在hive端就去除倾斜的现象, 保证spark端使用的时效
  • 过滤少数导致数据倾斜的key
  • 将key添加随机前缀, 进行局部聚合,然后去掉随机前缀, 再进行全局聚合
  • 改变parttion的数量
  • 自定义parttion
  • 避免shuffle

driver的作用

  1. 生成lineage, 将用户提交的程序划分成多个task去执行,
  2. 作为调度的角色, 提交请求到executor

spark sql和presto的区别

  • spark sql更方便进行数据的分析, 适合做OLAP(On-Line Analytical Processing)
  • presto更适合做实时的响应, 更适合做交互式查询, 在数据量和内存差不多的时候性能较好, 因为完全基于内存做计算,适合做OLTP(on-line transaction processing)
  • presto的社区人数相比spark而言比较少, 出现问题不容易解决

hive on spark, hive on MR 和spark sql的区别

  • hive on spark 和spark sql 计算引擎都是spark, 只是从sql翻译成执行计划不一样

hive on spark, sql的优化都是通过hive的, 这方便hive的经验比较丰富, 最后启动的还是一个spark job

spark sql 默认使用hive metastore去管理metadata, 使用spark 自身的sql 优化器: catalyst.

  • 若要讨论spark和MR的区别

    1. spark 容易编程, 不需要过多的抽象;MR需要较为复杂的抽象; spark支持多种算子, 而MR只支持map和reduce, 功能没有spark 丰富和易用.
    2. spark支持内存和硬盘以及混合存储三种方式, 而mr只支持hdfs一种, 这个是spark比较快的一个重要原因.
    3. spark的任务分配是更细粒度的, 例如划分了多个rdd, 中间有任务失败不需要从头开始计算; 而mr需要从头
    4. spark默认是lazy compution, 可以对中间过程进行很多优化
    • 性能对比, hive on spark , hive on mr, spark sql ,hive on tez
      2014-benchmark,
      总而言之, hive on spark在large join时性能突出, spark sql在map join时性能突出, hive on mr比较慢

spark-submit

  • shell 脚本

#!/bin/sh
set -o nounset
# 第一个错误, shell终止执行
set -o errexit 
export SPARK_HOME=
lib_path=
tempView=mblTempView
outputDir=/user/wutong/mblOutput
mblFile=mbl.txt
hdfs dfs -rm -r -f ${outputDir}
${SPARK_HOME}"/bin/spark-submit" \
--class com.mucfc.cms.spark.job.mainClass.PushMblToMerchantShellMain \
--master yarn \
--deloy-mode client \
--queue root \
--diver-class-path ${lib_path}"*" \
--jars ${lib_path}"" \
--conf spark.default.parallelism=200 \
--conf spark.sql.shuffle.partitions=400 \
--conf spark.executor.cores=3 \
--conf spark.executor.memory=450m \
--conf spark.executor.instances=4 \
${lib_path}"cms-sparkintegration.jar" "$@" \
-file_name 
-insertSql "insert overwrite table crm_appl.mbl_filter select * ${tempView}" \
-tempView ${tempView} \
-outputSql "select * from crm_app.mbl_filter" \
-showSql "select * from ${tempView}" \
-outputDir ${outputDir} \
-inputPartitionNum 200 \
-outputPartitionNum 200 \
-inputFlag true \
-appNme PushMblToMerchantShellMain \
-local_file_path ${lib_path}${mblFile}
-remote_file_path /user/wutong/mblFile \
-countForShortMbl 3000
-encrypt_type md5

spark sql 执行的大致流程

image

参考自

  1. sparksql-catalyst
  2. mit.edu-sigmod_spark_sql.pdf , 百度云网盘/book

首先将sql 语句通过parser模块解析成语法树, 称作 unresoloved logical plan, 这个plan再通过 Analyzer借助元数据解析为 logical plan, 再通过基于规则的优化, 得到 optimized logical plan, 此时执行计划依然是逻辑的, 并不能被spark 理解, 还需要转化为 physical plan.

  1. parser

将一长串sql 解析为一个语法树, 相当于是划分成一个个token, 并指定了token执行的先后顺序, 该模块基本都使用第三方类库 antlr实现.

  1. analyzer

遍历第一步提到的语法树, 将一个个token替换为具体的函数, 例如token为sum(), 则替换为具体的聚合函数; 并做数据类型的绑定, 定位到那个表的哪个列.

  1. optimzer

分为基于规则优化和基于代价优化(目前支持还不好, 具体说来是比较多种规则优化下哪个的时间代价越小, 就采用哪些规则进行优化), 规则优化例如谓词下推(Predicate Pushdown), 例如是filter下推到join之前, 先进行过滤再join, 减少大量数据; 常量累加(Constant Folding), 将中间值事先计算, 得出一个中间结果; 和列值裁剪(Column Pruning), 例如只要一列数据的, 就只传递该列数据, 减少大量的io和带宽.

  1. 将逻辑执行计划转化为物理执行计划, 转化为特定的算子进行计算.
  • 查看spark sql执行计划
  1. 使用queryExecution方法查看逻辑执行计划,使用explain方法查看物理执行计划, 在spark-sql 命令行, 执行
spark.sql("xxxsql").queryExecution()
spark.sql("xxxsql").explain()
  1. 使用Spark WebUI进行查看

spark join

分为 shuffle hash join、broadcast hash join以及sort merge join

  • hash join

将小表作为Build Table,大表作为Probe Table; 先将小表的join key, hash到bucket中, 构建hashtable, hashtable如果太大, 会放到磁盘上; 再讲大表的join key进行hash到同一个bucket中, 再判断两者的key是否相同. > 时间复杂度: O(a+b), 传统的笛卡尔积是 O(a*b)

选择小表作为build table, 生成的hashTable比较小, 能够完全放到内存中.

  • hash join的分布式改造
  1. broadcast hash join

将小表广播到大表的所有节点上, 适用于小表很小的情况

SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。

  1. Shuffle Hash Join

将两个表按照join key进行重分区(HashPartition) , 再在各个节点上进行hash join, 适用于一个大表,一个小表的情况.

  1. Sort-Merge Join
    可以看下 spark-join-pull过程

将两个表先进行shuffle, 再在各个分区节点的数据进行sort, 最后再根据join key 进行merge. 适用两个大表的情况, 因为spark的shuffle是 sort-based shuffle,shuffle之前就排序好了.

spark log

分为三部分: driver log, executor log, spark history server log

  • driver log

使用 yarn logs -applicationId xxxId > tmp.log

  • executor log

在下图目录下, 需要去hdfs 找applicationId下的目录
yarn-log

  • spark history server log

配置的路径

history-server-log

也可以去spark history server直接看, 但是可能会找不到, 需要看配置. 包含了spark application执行的整个流程和rdd的细节.


疑问

  • 还是要把本地windows环境起起来, spark如何指定 scala, hadoop
  • spark 序列化
  • 基准测试 benchmark
  • spark集群容错, 如果没有保留中间结果, 则如何重新计算, 从哪里开始 ?

工作踩坑指南

spark 任务提交

  • spark和scala的包都指定为provided,在客户端上指定SPARK_HOME和SCALA_HOME就可依赖到,避免版本问题

    • 如果SPARK_HOME有问题则,会出现

    ClassNotFound: sparkSession

    • 出现 xx.scala.xx error, 一般是scala 包有冲突
  • 环境变量配置好后,用source生效,然后打开一个新的终端去重启应用才可以取到新的环境变量

  • 需要配置分离,不用打包就可以改配置

  • hive元数据库(mysql),hive存储文件的位置(hdfs)在切换集群时都需要更换

  • 指定hadoop版本

参考 Stack Overflow
参考 hadoop-provided

  • 启动脚本

在spark-class2.cmd 有一个 LAUNCHER_OUTPUT, 默认启动完会删除, 可以保留看下启动的脚本, 帮助排查问题

  • windows环境下启动spark, 有可能脚本报错, 然后报错信息是乱码, 可以把spark的输出指定 GBK解码, 可能是因为中文环境下windows cmd的编码是中文编码, 所以用utf-8解码是乱码, 看到报错信息之后就很好排查问题了.

待看

经典资料

  1. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
  2. lhttps://github.com/JerryLead/SparkInternals

hbase

region架构
region架构

hdfs文件映射
hdfs文件映射.jpg

hbase 特点

  • 底层存储基于hdfs, 可存储大量数据
  • 分布式存储,基于region可拆分,compaction等特性,良好的读写性能
  • 主要基于列存储, 非常易扩展,可存储非结构化数据

rowKey 的设计

  • tall-Narrow 模式
    例如查找查找一个用户的所有邮件
    可以这样设计
    userId-date-messageId-attachmentId
    可以用prefixFilter查询

  • flat-wide mode
    userId为rowKey, valueId为Column key, 很多相同的rowKey,

  • 为什么tall模式比wide模式快
    因为HFile的下一级存储模式是key-value,
    结构如下:

    可以更快的获取到column qualifier

hbase 命令

// scan meta for the table, get region info
 scan 'hbase:meta',{FILTER=>"PrefixFilter('table')"}
 
//  filterlist
f_keyonly = org.apache.hadoop.hbase.filter.KeyOnlyFilter.new();
f_firstkey = org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter.new();
flist = org.apache.hadoop.hbase.filter.FilterList.new([f_keyonly, f_firstkey]);
scan 'mytable', {STARTROW => 'myStart', ENDROW => 'myEnd', FILTER =>  flist }

// rowcount
hbase org.apache.hadoop.hbase.mapreduce.RowCounter <tablename>

Usage: RowCounter [options] 
    <tablename> [          
        --starttime=[start] 
        --endtime=[end] 
        [--range=[startKey],[endKey]] 
        [<column1> <column2>...]
    ]

// 重启
sh hbase-daemon.sh restart regionserver

hbase 架构

RS下有多个region, 根据rowkey的分布均匀分布在多个region; 一个table的数据分布在多个region, 一个CF对应一个store, 一个memstore, 一个store下面对应多个storeFile,一个storeFile由多个hdfs的block组成

hbase 读流程

  1. 从zk上获取hbase:meta表的所在的RS,可以通过zookeeper命令(get //meta-region-server)查看该节点信息
  2. 从meat表获取row所在的region ,并且meta表的信息会被客户端加载到缓存 可以用 scan 'hbase:meta' 来获取该表的信息.meta表的结构
  3. region信息被更新, 例如split等后, 会更新meta表

hbase写流程

  • 大致流程
  1. 本地put, autoFlush默认为true, 每次put都会执行一次RPC ,把数据送到RS, 可把autoFlush改为false, 达到一定的writeBuffer, 才会把数据批量提交
  2. RS接收到请求进行校验后, 先写入WAL(保证可靠性), 再写memstore(提升效率), 把随机写变成了一次内存写和顺序写
  3. memstore达到一定大小再异步flush进入hdfs

Hbase 写入流程(网易-范欣欣)

  • 写性能优化
    • 根据业务要求, 看吞吐量和数据准确性之间的权衡, 默认是同步写WAL, 可以改成异步写或者不写.
    • put可以改成同步批量提交, 减少到RS的RPC连接数; 也可以改成异步批量提交, autoFlush(默认为true)改为false, 但是客户端异常会导致数据丢失
    • 提高region的数量, 在Num(Region of Table) < Num(RegionServer)的场景下切分部分请求负载高的Region并迁移到其他RegionServer
    • 考虑写入的请求是否不均衡, 看rowKey的设计是否合理, 或者可以采用预分区的策略
    • 在写入很大的keyValue时,例如文件等数据, 导致RS的handler耗尽, 目前hbase-2.0已经采用hbase MOB,极大的提升了性能

hive数据批量写入hbase

  • 目前采用的是ftp文件到hbase客户端批量put的方式, 稳定性差, 容易引起split和compaction, 产生大量对象, GC频繁, 影响在线系统的查询, 可以采用hbase自带的bulkload, 通过将hive 的底层存储文件格式转化为hfile 导入hbase,

问题

  • Sync HLog:HLog真正sync到HDFS,在释放行锁之后执行sync操作是为了尽量减少持锁时间,提升写性能。如果Sync失败,执行回滚操作将memstore中已经写入的数据移除
  • WAL 的作用,
  • sync到文件系统和到磁盘有何区别
  • 写为什么会对读产生影响
    答: 可能产生region split, 可能产生compaction, 其他的性能影响, IO等

Memstore Flush

参考自 link

HBase会在如下几种情况下触发flush操作,需要注意的是MemStore的最小flush单元是HRegion而不是单个MemStore。可想而知,如果一个HRegion中Memstore过多,每次flush的开销必然会很大,因此我们也建议在进行表设计的时候尽量减少ColumnFamily的个数。

  • Memstore级别限制
    当Region中任意一个MemStore的大小达到了上限(hbase.hregion.memstore.flush.size,默认128MB),会触发Memstore刷新。

  • Region级别限制
    当Region中所有Memstore的大小总和达到了上限(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认 4* 128M),会触发memstore刷新。

  • Region Server级别限制
    当一个Region Server中所有Memstore的大小总和达到了上限(hbase.regionserver.global.memstore.upperLimit * hbase_heapsize,默认 40%的JVM内存使用量),会触发部分Memstore刷新。Flush顺序是按照Memstore由大到小执行,先Flush Memstore最大的Region,再执行次大的,直至总体Memstore内存使用量低于阈值(hbase.regionserver.global.memstore.lowerLimit * hbase_heapsize,默认 38%的JVM内存使用量)。当一个Region Server中HLog数量达到上限(可通过参数hbase.regionserver.maxlogs配置)时,系统会选取最早的一个 HLog对应的一个或多个Region进行flush

  • HBase定期刷新Memstore
    默认周期为1小时,确保Memstore不会长时间没有持久化。为避免所有的MemStore在同一时间都进行flush导致的问题,定期的flush操作有20000左右的随机延时。

  • 手动执行flush
    用户可以通过shell命令 flush ‘tablename’或者flush ‘region name’分别对一个表或者一个Region进行flush。

Compaction

  • 流程

目的选择文件进行合并, **是选择文件小且io负载重的文件, 有几个文件选择算法: RatioBasedCompactionPolicy、ExploringCompactionPolicy和StripeCompactionPolicy

  • 触发时机
  1. memstore flush, flush之后, region下面的所有store会判断各自的storeFile的数量是否大于某个值,若大于, 则会compaction.
  2. 周期性检查, 先检查文件数量是否大于某个值, 若不满足, 则检查是否满足major compaction的条件, 判断是否是MajorCompaction 首先基础间隔是 hbase.hregion.majorcompaction = 七天 , 并根据majorcompaction.jitter 作浮动, 根据 storeFiles中文件最早的修改时间距离今天已经超过了间隔时间, 则进行MC 可通过配置 hbase.hregion.majorcompaction = 0 来全局关闭 MC.
  3. 手动触发
  • 文件选择策略
  1. 排除正在compaction的文件
  2. 排查单个过大的文件, 如果文件大小大于hbase.hzstore.compaction.max.size(默认Long最大值),则被排除,否则会产生大量IO消耗

经过排除的文件称为候选文件, 然后会再判断是否满足major compaction, 如果满足则选择全部文件进行合并, 只要满足以下三个条件中的一个, 就会执行MC

  1. 用户强制执行major compaction
  2. 长时间没有进行compact(CompactionChecker的判断条件2)且候选文件数小于hbase.hstore.compaction.max(默认10)
  3. Store中含有Reference文件,Reference文件是split region产生的临时文件,只是简单的引用文件,一般必须在compact过程中删除

如果不满足major compaction条件,就必然为minor compaction,HBase主要有两种minor策略:RatioBasedCompactionPolicy和ExploringCompactionPolicy,

  • major compaction和minor compaction有何区别?

1.Minor操作只用来做部分文件的合并操作以及包括minVersion=0并且设置ttl的过期版本清理,不做任何删除数据、多版本数据的清理工作。

2.Major操作是对Region下的HStore下的所有StoreFile执行合并操作,最终的结果是整理合并出一个文件。

There are two types of compactions: minor and major. Minor compactions will usually pick up a couple of the smaller adjacent StoreFiles and rewrite them as one. Minors do not drop deletes or expired cells, only major compactions do this. Sometimes a minor compaction will pick up all the StoreFiles in the Store and in this case it actually promotes itself to being a major compaction.

After a major compaction runs there will be a single StoreFile per Store, and this will help performance usually. Caution: major compactions rewrite all of the Stores data and on a loaded system, this may not be tenable; major compactions will usually have to be done manually on large systems.

  • hbase major和minor compaction源码解析(待测试和根据博文hbase compaction 深入研究,了解几个策略的不同)

  • 前言
    本来只想围绕 触发条件, 触发周期, 内部流程, 外部影响, 如何控制几个方面来看这个代码, 忽略不重要的细节, 以了解整体流程为主要目的, 相信这也是看源码的一个指导方针吧(不然只见树木不见森林),但是从入口这里 线程池的抽象挺有意思.

  • ChoreService,chore原意为工人,这里可以把这个类理解为包工头,内部有个线程池和一个管理着每个工人返回结果的map
    工人默认每隔十秒钟检查一次, compactionChecker里面有个mutilper, 变成每隔 compactionchecker.interval.multiplier(默认1000) * thread.wakefrequecy(默认10 * 1000) 毫秒执行一次check

  • 判断是否进行minor compaction
    基于compaction policy来判断, 当前1.2.0版本的默认policy 是 RatioBasedCompactionPolicy, 根据当前的 num of storeFiles - num of compaction files > minFilesToCompact(默认为3)

  • 判断是否是MajorCompaction
    首先基础间隔是 hbase.hregion.majorcompaction = 七天 , 并根据majorcompaction.jitter 作浮动, 根据 storeFiles中文件最早的修改时间距离今天已经超过了间隔时间, 则进行MC
    可通过配置 hbase.hregion.majorcompaction = 0 来全局关闭 MC.

  • 可在 建表的时候指定 COMPACTION => false来关闭所有的compaction

hbase shell>create 'cdp_table_test', {NAME =>'crm', VERSIONS => 2 , COMPRESSION => 'SNAPPY'}, {SPLITS => ['8888|']}, {METADATA => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy'}}, {MAX_FILESIZE => '107374182400'}

region split

  • 产生条件
    smaller(设置的最大的region size(默认10 GB), current region number的立方 * 2 * memory flush size)

  • compaction和split为何会影响 IO? 亦或是其他?需要实验

关闭 auto split

建表时设置拆分策略为 ConstantSizeRegionSplitPolicy, 并指定最大的region size 为100 GB,所有的store files size 总和超过才拆分, 若想导数快点, 则可先预分区, 待导数完毕后再关闭自动拆分.

手动触发major compaction

hbase-majoralltable.sh

zookeeper在hbase中的作用

  • 存储hbase元数据(hbase:meta表所在的RS的信息)
  • 可以保证节点信息在分布式环境下的同步

dataType

  • versioning
    可以无限制的put 任何数量的版本,超过 MaxVersion的其他更旧的记录会在Major compaction触发的时候删掉

hbase important configuration

  • zookeeper.session.timeout
    默认3分钟, 意思是超过这个时间, HMaster才会发现然后恢复, 设置得太小,容易导致GC也会被认为 RS down

spring

  • spring的理解

    spring是以IOC和AOP为核心的众多组件的总称, 其他还包括spring-boot, spring-mvc等组件, 涵盖了后台开发的从前端到后台交互, 后台分层, 中间件, 数据库等持久层等几乎方位的支持.

spring IOC

  • 大致**
    首先spring的控制反转来源于依赖倒置(Dependency Inversion Principle )的**,
    传统的依赖顺序是高层依赖底层, 而依赖倒置指的是底层依赖高层. 例如车子的建造, 车子依赖车身, 车身依赖底盘, 底盘依赖车轮, 若需要改动车轮的规格, 则以上所有东西都需要改, 若反过来, 先定好车子的样子, 车身 -> 车子, 底盘 -> 车身, 轮子 -> 底盘, 要改车轮, 不会影响其他.

  • 概念关系
    依赖倒置是**, 控制反转是实现该**的一个思路, 依赖注入是该思路的具体实现方法, 依赖注入可以简要说是把底层类作为参数传递给上层类, 实现上层对下层的控制, 而IOC容器则封装了底层类的构造方法, 不需要知道底层类的实现细节以及 具体的构造函数的如何调用, 只需要按照依赖注入即可.


以下内容均参考 spring-framework-reference/core/beans-annotation-config

  • xml和annotation两种方式比较

    annotation使配置更简洁, 和源码放在一起, 更精确, 但是配置比较分散, 不在一个地方很难控制 ; xml的方式不需要去动源码, 所以不需要重新编译,

  • @required@Autowired 比较

    前者如果没有注入成功, 会报错, 且只能用于setter method.; 后者可以设置是否required(默认true) ,并且当无法autowire时会自动忽略, 适用于constructor, field , setter method or config method.

  • @Autowired

    按照byType注入, 可以设置required=false, 找不到bean时也不报错, 后续可能报NPE.

    • 用于constructor, 当构造函数有多个时, 可以将指定一个进行autowired, 否则将默认以贪婪模式(参数最多的构造函数进行构造).

    • 获取所有类型的bean

    It is also possible to provide all beans of a particular type from the ApplicationContext by adding the annotation to a field or method that expects an array of that type:

    public class MovieRecommender {
    
    @Autowired
    private MovieCatalog[] movieCatalogs;
    
    }
    
    public class MovieRecommender {
    
    private Set<MovieCatalog> movieCatalogs;
    
    @Autowired
    public void setMovieCatalogs(Set<MovieCatalog> movieCatalogs) {
        this.movieCatalogs = movieCatalogs;
    }
    
    // ...
    }
    
  • @qualifier

    可以结合@autowire使用, 当某个type有多个实例时, 可以用@qualifier 指定某个id的bean注入. @qualifier@Autowired结合使用时, 和@resource几乎等同.

  • @resource

    既可以byName, 也可以byType注入, 默认byName, 就是按照bean的id注入.

    • 装配顺序
    1. 如果同时指定了name和type,则从Spring上下文中找到唯一匹配的bean进行装配,找不到则抛出异常

    2. 如果指定了name,则从上下文中查找名称(id)匹配的bean进行装配,找不到则抛出异常

    3. 如果指定了type,则从上下文中找到类型匹配的唯一bean进行装配,找不到或者找到多个,都会抛出异常

    4. 如果既没有指定name,又没有指定type,则自动按照byName方式进行装配;如果没有匹配,则回退为一个原始类型进行匹配,如果匹配则自动装配;

    问题 在细看一下@qualifier

  • @component@repository@service@controller

    如果 Web 应用程序采用了经典的三层分层结构的话,最好在持久层、业务层和控制层分别采用上述注解对分层中的类进行注释。@service用于标注业务层组件.@controller用于标注控制层组件(如struts中的action).@repository用于标注数据访问组件,即DAO组件. @component泛指组件,当组件不好归类的时候,我们可以使用这个注解进行标注。

    • @bean@component的区别

      前者是显式的返回一个object, 然后注入到application context, 让spring代管理, 例如可以适用于第三方的lib, 无法再源码上@component; 后者是让spring创建并管理, 隐式的创建.

classpath*: / 与 classpath:/

  • classpath*:/ 表示在此classpath(当前项目的classpath及类路径的classpath)下的所有兄弟和子孙路径的 classpath下满足要求的所有文件
  • classpath:/ 表示classpath下的第一层子路径的第一个符合要求的文件

build project

  • 新建maven project ,选择archetype 为webapp或site,例如 module 名为 test,module file location 和content root 均为test路径

  • web项目下相对路径和绝对路径:

    • "." 代表目前所在的目录。
    • ".." 代表上一层目录。
    • "/" 代表根目录。

前后台传输数据

  1. 前台需要把对象通过JSON.stringfy转换json字符串,后台在参数对应加上@requestbody,并且Request的content-type必须为json,不然会出现415的错误.
  1. 后台传输少量数据到前端,避免新增类文件的做法:
Map<String,Object> map = new HashMap<String,Object>();
map.put("leader",ifLeader);
map.put("teamId",teamId);
jsonResult.setData(new JSONObject(map));
  1. 日期类参数
  • 如果controller的dto和后台数据映射dto为同一个,且日期类使用java.util.Date作为转换类型,则在该参数上加两个标签,省掉格式转换和非空判断,@DateTimeFormat为接受前台数据的转换,@JSONField为后台给前台返回数据的转换。
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JSONField(format="yyyy-MM-dd HH:mm:ss")
Date startTime
  • spring-servlet.xml需要加以下配置
<mvc:annotation-driven>
        <mvc:message-converters register-defaults="true">
        <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter">
            <property name="supportedMediaTypes" value="application/json"/>
            <!--设置fastjson特性-->
            <property name="features">
                <array>
                    <!--设置null值也要输出,fastjson默认是关闭的-->
                    <value>WriteMapNullValue</value>
                    <!--设置使用文本方式输出日期,fastjson默认是long-->
                    <value>WriteDateUseDateFormat</value>
                </array>
            </property>
        </bean>
        </mvc:message-converters>
    </mvc:annotation-driven>

mybatis返回map

mapper:

<resultMap id="teamResultMap" type="com.mucfc.result.QueryTeamResult">
    <id column="id" property="teamId" jdbcType="INTEGER" javaType="int" />
    <result column="name" property="teamName"  jdbcType="VARCHAR" javaType="String"/>
</resultMap>
<select id="queryTeam"  resultMap="teamResultMap"  parameterType="int" >
    select id,name  from tb_dept WHERE parent= #{level}
</select>

// daoMybatis:
public List<QueryTeamResult> queryTeam(int level){
    return selectList("WeeklyMapper.queryTeam",level);
}

jetty启动时修改资源文件,可以通过update更新

修改/${jetty_home}/etc/webdefault.xml 将true改为false
并在pom文件中配置

<init-param>
  <param-name>useFileMappedBuffer</param-name>
  <param-value>false</param-value>
</init-param>
pom.xml
<plugin>
  <groupId>org.eclipse.jetty</groupId>
  <artifactId>jetty-maven-plugin</artifactId>
  <configuration>
  <webAppConfig>
  <contextPath>/{project.build.finalName}</contextPath>
  </webAppConfig>
  <scanIntervalSeconds>3</scanIntervalSeconds>
  <httpConnector>8080</httpConnector>
  <webAppXml>jetty.xml</webAppXml>
  <webApp>
  <defaultsDescriptor>webdefault.xml</defaultsDescriptor>
  </webApp>
  </configuration>
</plugin>

httpServletResquest

  • getAttribute()

An attribute is a server variable that exists within a specified scope i.e.:
application: available for the life of the entire application
session: available for the life of the session
request: only available for the life of the request
page (JSP only): available for the current JSP page only

  • getParameter()

returns http request parameters. Those passed from the client to the server. For example http://example.com/servlet?parameter=1.
Can only return String

spring @value

  • static field cant inject directly.
private static String test;
@Value("${test.val}")
public void setTest(String a){
    test = a;
}
  • #{configProperties.configVal} , ${configVal} 区别
    link

  • 其他用法
    link

mybatis

  • 搞清楚mybatis ${var}和#{var}的区别

Boolean 值传递

数据库类型是tinyInt, dto字段类型是Boolean或boolean,

<if test="filedName != null">
 #{fieldName}
</if>
 // 不要加非空字符串判断

spring AOP

代理技术可以说是封装了切面类(功能加强类)的逻辑和目标类的逻辑, 解耦两者的代码, 让功能加强类的逻辑能无缝插入目标类, 不产生浸入式代码.
因为以下两种原生代理, 存在以下几个问题: 
1. 不能对目标类的某些方法进行选择后进行切面
2. 还是通过硬编码的方式在代理类中, 在目标方法前后进行切面
3. 对每个类都要创建代理类, 无法实现通用
因此spring aop应运而生. 
  • pointcut 切点

    指定在哪些类和哪些方法做切入

  • advice 增强

    定义加强类的逻辑, 方法前还是方法后等

  • advisor
    切面, 将pointcut和advice结合起来

  • JDK动态代理
    只能对接口或者实现类进行方法切面,
    问题
    为什么只能基于接口做代理.

  • CGLib代理

    采用动态创建子类的方法, 因此不能对final, private等方法进行切面; 而且相比于JDK性能比较高, 创建代理对象的时间比较长, 适用于spring 多数类都是单例的情况.

  • 示例代码

// maven dependency
<!-- AspectJ dependencies -->
		<dependency>
			<groupId>org.aspectj</groupId>
			<artifactId>aspectjrt</artifactId>
			<version>${aspectj.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.aspectj</groupId>
			<artifactId>aspectjtools</artifactId>
			<version>${aspectj.version}</version>
		</dependency>
		
		
// spring application.xml config
<!-- Enable AspectJ style of Spring AOP -->
<aop:aspectj-autoproxy  />
// 下面这种方式用于CGLIB, 可以切面类的非接口方法, 否则上面那种基于JDK的代理只能切接口方法
<aop:aspectj-autoproxy  proxy-ttarget-class="true" />

@Aspect
// 注意需要依赖注入
@Component
public class EmployeeAspect {

	// 注意方法()内有两个点, 代表任意参数
	@Pointcut("execution(* com.journaldev.spring.service.*.get*(..))")
	public void pointCut(){
		System.out.println("enter pointcut");
	}
	
	@Before("pointCut()")
	public void before(){
		
	}
}
  • 切面进入的条件
    public , non static , non final ,called by class outside(非必须)
    若要在类的内部调用中执行切面, 例如类里面有两个方法A, B, 在A调用B的时候进入切面, 则需要如下写法:
    // 注意该类需要由spring管理,并生成, 若是在外部 Test t = new Test(); t.methodA();或者 t.methodB(); 均不能进入切面
    @Component
    public class Test{
    @Resource Test test
        public void methodA(){
            // something
            test.methodB();
            // something
        }
        
        public void methodB(){}
        
    }

spring事务


  • 数据并发问题
  • 脏读(dirty read), A事务读到了B事务尚未提交的数据, 可理解为读到了脏数据, 若此时B事务回滚, 则会产生数据不一致的情况.
Transaction 1	                                          Transaction 2
/* Query 1 */
SELECT age FROM users WHERE id = 1;
/* will read 20 */
                                                        /* Query 2 */
                                                        UPDATE users SET age = 21 WHERE id = 1;
                                                        /* No commit here */
/* Query 1 */
SELECT age FROM users WHERE id = 1;
/* will read 21 */
                                                        ROLLBACK; /* lock-based DIRTY READ */

  • 不可重复读(unrepeatable read)
    因为select可能不需要锁, 或者发生在两次select锁获取之前, 就会读到不可重复读取的数据, 根据不同的隔离级别, 会有不同的数据.

At the SERIALIZABLE and REPEATABLE READ isolation levels, the DBMS must return the old value for the second SELECT. At READ COMMITTED and READ UNCOMMITTED, the DBMS may return the updated value; this is a non-repeatable read.

  • 有两种策略来处理此种情况
    1. 延迟事务2的执行直到事务1提交或者回滚, 等于说维护一个时序: serial schedule T1, T2.
    2. 为了获取更好的性能,让事务2被先提交, 事务1的执行必须满足时序 T1, T2, 若不满足事务1被回滚.

在不同模型下, 结果会不一样

Using a lock-based concurrency control method, at the REPEATABLE READ isolation mode, the row with ID = 1 would be locked, thus blocking Query 2 until the first transaction was committed or rolled back. In READ COMMITTED mode, the second time Query 1 was executed, the age would have changed.

Under multiversion concurrency control, at the SERIALIZABLE isolation level, both SELECT queries see a snapshot of the database taken at the start of Transaction 1. Therefore, they return the same data. However, if Transaction 1 then attempted to UPDATE that row as well, a serialization failure would occur and Transaction 1 would be forced to roll back.

At the READ COMMITTED isolation level, each query sees a snapshot of the database taken at the start of each query. Therefore, they each see different data for the updated row. No serialization failure is possible in this mode (because no promise of serializability is made), and Transaction 1 will not have to be retried.(这段怎么跟之前说的 read commited 不一样 ? )

  • 幻象读(Phantom reads),是不可重复读的特例, 相当于是范围读.
    Transaction 1	                                        Transaction 2
/* Query 1 */
SELECT * FROM users
WHERE age BETWEEN 10 AND 30;
                                                /* Query 2 */
                                                INSERT INTO users(id,name,age) VALUES ( 3, 'Bob', 27 );
                                                COMMIT;
/* Query 1 */
SELECT * FROM users
WHERE age BETWEEN 10 AND 30;
COMMIT;


Note that Transaction 1 executed the same query twice. If the highest level of isolation were maintained, the same set of rows should be returned both times, and indeed that is what is mandated to occur in a database operating at the SQL SERIALIZABLE isolation level. However, at the lesser isolation levels, a different set of rows may be returned the second time.

In the SERIALIZABLE isolation mode, Query 1 would result in all records with age in the range 10 to 30 being locked, thus Query 2 would block until the first transaction was committed. In REPEATABLE READ mode, the range would not be locked, allowing the record to be inserted and the second execution of Query 1 to include the new row in its results.


Transaction 1	                                    Transaction 2
/* Query 1 */
SELECT * FROM users WHERE id = 1;
                                                    /* Query 2 */
                                                    UPDATE users SET age = 21 WHERE id = 1;
                                                    COMMIT; 
                                                    /* in multiversion concurrency
                                                    control, or lock-based READ COMMITTED */
/* Query 1 */
SELECT * FROM users WHERE id = 1;
COMMIT; 
/* lock-based REPEATABLE READ */

Isolation level		Dirty reads	    Non-repeatable reads	            Phantoms
Read Uncommitted	may occur	    may occur(1)	                    may occur(2)
Read Committed		don't occur	    may occur(1)	                    may occur(2)
Repeatable Read		don't occur	    don't occur	                        may occur(2)
Serializable		don't occur	    don't occur	                        don't occur

(1): 返回修改后的值
(2): 会返回新插入的值
Serializable : 均会阻塞事务2的更新, 直到事务1提交
  • mysql的锁
  • spring的事务传播级别, 实际上就是多个方法调用, 发生多个事务交叉时, 如何选择当一个事务提交或者回滚后对另一个事务的影响
  • mybatis, 如果mapper不用interface, 用class可以注入吗

Exception: Spark driver throw CommonTree ClassNotFoundEx, Executor throw block not have enough number of replicas

记录一下这个排查了挺久的生产hive集群跑批问题, 使用场景是hive on spark, hive版本是2.2, 使用的原生社区的版本, spark版本是1.6, 使用的是cdh的spark.
大致场景是hive on spark每天晚上跑批的时候, 集群高峰时段偶尔会有任务卡主, 然后超时被我们的调度系统杀掉, 影响跑批的进度

  • 现象
  1. yarn logs收集到driver端的日志, 报错是 commonTree classNotFound exception
    1
    2

然后因为有部分task没有完成, 导致下一个stage没能开始, 这个job就一直卡住了.

  1. 同一时间在executor上面报了如下异常, executor日志如下, 这个日志跟上面driver端的时间不一样, 因为这后面写这篇文章的时候, 截图比较乱了, 没有找到完整一个job的所有异常截图.
    图3

因为hdfs写文件的时候, 当client把最后一个block 提交到dn之后, 最后通过 DFSInputStream.close() 去关闭连接, 会轮训请求nn 进行一系列的检查, 包括文件副本最小数必须大于1(默认配置), 否则抛出异常给client.
同时namenode日志如下:
4

  1. datanode的日志还需找下, 记得是同时报了管道破裂等IO错误.
  • 解决过程
  1. 关于第一个现象, 一开始只发现了第一个现象, 没有发现executor的异常, 所以自然是觉得jar包问题, 尝试了如下几种添加jar包的方式:
  • hive添加配置: Add Jar(一开始不懂, 应该在spark端添加jar)
  • spark依赖第三方jar, 将jar包放到driver和executor的container中, 前两个参数会从container中取jar包, 第三个参数会把jar包加载到container中.
hive -hiveconf spark.driver.extraClassPath=./antlr-runtime-3.4.jar  -hiveconf spark.executor.extraClassPath=./antlr-runtime-3.4.jar -hiveconf spark.yarn.dist.files=/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/jars/antlr-runtime-3.4.jar

并且打印executor和driver加载的class, 仍然没发现加载了commonTree这个class, 且仍然报错

spark.executor.extraJavaOptions=-verbose:class
spark.driver.extraJavaOptions=-verbose:class

期间还设置了eventLog, 以上方法都没有解决这个问题, 后面发现了现象2.

  1. 针对executor写datanode报错, IO层面的异常就比较难排查了, 暂且想到了几个优化集群的方案.
  1. 根据图3的异常, 看了下源码, completeFile的时候会检查block的最小副本数是否达到, 客户端会轮询等待nn, 根据后续block 结束completeFile的时间(大概有二十几秒), 增加了retry次数之后, 后续的达不到最小副本数的异常有所减小,
    image

但是仍然出现异常, 故想到写pipeline的dn失败时,重试其他dn, 故找到以下配置:
image

dfs.client.block.write.replace-datanode-on-failure.policy, 这个配置的解释一开始没看懂n这个参数( let n be the number of existing datanodes), 后面参考了这篇文章, http://blog.cloudera.com/blog/2015/03/understanding-hdfs-recovery-processes-part-2/, 应该是集群中现有的dn的数量,如果是default则按照公式进行计算, 如果是always, 每次都新加一个dn到pipeline中.

dfs.client.block.write.replace-datanode-on-failure.best-effort, 再说一下这个配置, 一开始也是没看懂, 也是根据"understanding-hdfs-recovery-processes-part-2", 这篇文章才理解的, 假设这个参数是false, 如果作为replacement的dn也写失败的话就会直接抛出异常, 终止重试; 如果设为true, 则假设replacement的dn也写失败, 仍然会找新的dn去重试.

所以我们想要的是反复重试新的dn, 直到客户端发起completeFile请求时, 轮询nn超时, 故把dfs.client.block.write.replace-datanode-on-failure.policy设置为always, dfs.client.block.write.replace-datanode-on-failure.best-effort设为true, namenode的block state change的日志级别调成debug, 再观察后续出现写异常的时候是否有重试其他dn.

learning JVM

运行时数据区域

程序计数器(programme counter register)

若执行的是非native方法, 则保存下条指令的地址; 若是native方法, 则为空;每个线程独有, 互不影响

虚拟机栈(virtual machine stacks), 本地方法栈(native method stack)

每个线程独有, 每个方法创建的时候都会创建一个栈帧(stack frame),用于存储方法的局部变量, 操作数栈等.
, 虚拟机栈和本地方法栈的不同是,前者执行java方法, 后者执行native方法

  • StackOverFlowError
    线程请求的栈深度大于虚拟机允许的栈深度, 抛出该异常
  • OutOfMemoryError
    如果虚拟机栈可以动态扩展,但是扩展时无法申请到固定的内存,会抛出OutOfMemoryError

java 堆(heap)

存放对象的实例和数组, 所有线程所共有; 如果堆中没有内存完成实例的分配, 并且堆也无法再扩展时,抛出 OutOfMemoryError

方法区(Method Area)

线程间共享, 存储每个类的结构,包括运行时常量 (包括string pool) ,静态变量,即时编译器编译后的代码等数据

本地内存(native memory, C heap)

  1. 管理java heap的状态数据(用于GC);
  2. JNI调用,也就是Native Stack;
  3. JIT(即使编译器)编译时使用Native Memory,并且JIT的输入(Java字节码)和输出(可执行代码)也都是保存在Native Memory;
  4. NIO direct buffer。对于IBM JVM和Hotspot,都可以通过-XX:MaxDirectMemorySize来设置nio直接缓冲区的最大值。默认是64M。超过这个时,会按照32M自动增大。
    DirectBuffer访问更快,避免了数据从heap memory 拷贝到本地堆。DirectBuffer byte array 实际是保存在native heap中,但是操作该byte array的对象保存在java heap中。
    GC时不会直接回收native memory, 通过释放heap memory中的对象来释放native memory, 但是通常java heap没达到gc 的条件.
  5. 对于IBM的JVM某些版本实现,类加载器和类信息都是保存在Native Memory中的。
  • tips
    • 分配内存的时候优先给heap memory 分配, 再到native memory

虚拟机对象

  • 创建
    每个线程分配一块独立的内存,本地线程分配缓冲(Thread local allocation buffer),来控制给每个对象分配内存时是线程安全的

  • 对象的内存布局
    对象头, 实例数据, 对齐填充

  • 对象的访问
    sun hotspot通过直接指针的方式, reference存储了对象的地址,存储在栈区(应该指的是虚拟机栈),直接访问到堆中的对象的数据,对象的数据中包含类的信息.

内存泄漏原因

  • StackOverFlowError

有可能是栈的深度超过最大深度, 也有可能是栈区的内存大小不足, 实质应该是一样的, 可以通过增加栈区的内存大小(-Xss).
操作系统分配个每个进程的内存是固定的 , windows下32位每个进程最大内存是2GB, 减去堆(-Xmx 最大堆容量) ,方法区(-MaxPermSize 最大方法区容量),程序计数器所占内存太小忽略不计, 剩下的就是栈区(包括虚拟机栈和本地方法栈)

  • OutOfMemoryError

可以参考自

  1. Java内存溢出(OOM)异常排查指南
  2. 深入解析OutOfMemoryError
  1. Exception in thread "main" java.lang.OutOfMemoryError: Java heap space

堆内存溢出, 超过设置的最大堆的大小,则报该异常, 直接原因就是因为没有内存分配给新的对象了, 间接原因可以说是gc时应该清除的对象没有被清除, 某些地方仍然保留有该对象的引用.

  1. 因为jvm会尽量保持堆是初始化的大小, 所以设置最大堆(-Xmx) 和堆的初始化值(-Xms)一样, 以减小gc 频率
  2. heap dumps

反映对象的数量和类文件所占用的字节数, -XX:+HeapDumpOnOutOfMemoryError, 在快oom的时候打印日志.

  1. 堆转储分析:live objects

使用jmap并且加上-histo参数可以为你产生一个直方图,它显示了从程序运行到现在所有对象的数量和内存消耗,并且包含了已经被回收的对象和内存。如果使用-histo:live参数会显示当前还在堆中得对象数量及其内存消耗,不论这些对象是否要被垃圾搜集器进行回收。

  1. 堆转储分析:跟踪引用链

浏览堆转储引用链具有两个步骤:首先需要使用-dump参数来使用jmap,然后需要用jhat来使用转储文件。查看对象的引用链

  1. 堆转储分析:内存分配情况

可以找到对象使用的情况, 以及这些对象的引用被哪里的代码使用的, 但是有时候这种方式还是不够的, 例如string对象会很多,

  • Exception in thread "main" java.lang.OutOfMemoryError: PermGen space

存储类的内存区域出现内存泄漏, 可能出现在应用被频繁部署的时候, 某些类占的内存区域没有被释放, 解决永久代错误的第一个方法就是增大永久大的空间,你可以使用-XX:MaxPermSize命令行参数。默认是64M,但是web应用程序或者IDE一般都需要256M。

解决永久代的问题通常都是比较痛苦的。一般可以先考虑加上-XX:+TraceClassLoading和-XX:+TraceClassUnloading命令行选项以便找出那些被加载了但是没有被卸载的类。如果你加上了-XX:+TraceClassResolution命令行选项,你还可以看到哪些类访问了其他类,但是没有被正常卸载。

  • Exception in thread "main" java.lang.OutOfMemoryError: Direct buffer memory

native memory不够用了, 见"深入理解JVM" 83页: 明显的特征, heap dump文件中不会看见明显的异常, dump文件很小, 如果程序中使用了大量的NIO, 需要考虑是否是这个问题导致.

  • Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread

线程太多了, 实际应该不需要, 因为可能大多数线程在等待, 而且需要频繁切换, 可以换成aio 或者nio

java heap分析工具

jmap -dump:format=b,file-fileName pid
// 然后用 Eclipse memory analyzer 打开

问题

  1. linux内存分配策略, 每个进程如何分配内存, 和windows有何不同

三. 垃圾回收

可达性分析

若不存在从对象到GC root的引用链, 则在下次gc时, 该对象会被回收, 下次gc可能是minor gc(日志中称作 gc, 发生在新生代 ), major gc(日志中称作 full gc, 发生在老年代)

  • 回收方法区(永久代)
    主要针对类的回收,满足以下三个条件:
  1. 类的所有实例已被回收
  2. 加载该类的classloader已被回收
  3. 该类的java.lang.class对象没有任何地方被引用, 不存在反射访问该类的方法.

引用

  • 强引用
    只要强引用存在, 则永远不会被回收
 Obj a = new Obj();
  • 软引用
    SoftReference来实现, 除非内存准备溢出了, 不然不会被回收.

  • 弱引用
    WeakReference的对象, 若只被弱引用引用, 不被其他任何强引用引用时, 如果GC运行, 那么该对象就会被回收.例子 ThreadLocal中的Entry 持有对ThreadLocal对象的弱引用, 若所有使用该ThreadLocal的线程均退出,

  • 虚引用

DirectByteBuffer

垃圾收集策略

  • 标记-清除算法

需要回收的对象进行一次标记,标记完成后统一回收

  • 缺点

    • 标记和回收的效率都不高
    • 回收后产生大量内存碎片,以至于分配较大对象时,无法得到连续的足够内存而导致再次进行gc
  • 标记-复制算法

将内存分配为一块eden和两块survivor, 比例是8:1, 每次使用新生代内存的90%, gc时将存活对象复制到空闲的survivor, 剩余对象一次性清理

  • 缺点

    • 不适用对象存活率较高的情况,复制的对象太多, 例如老年代
  • 标记整理算法
    将可用的对象向一端移动,然后清理掉边界以外的内存,有效避免了内存碎片(针对标记清除算法)和需要有一部分空间来作为留存空间(针对标记-复制算法).

  • 分代收集算法
    对新生代和老年代采用不同的垃圾收集算法, 例如HotSpot新生代采用标记-复制, 老年代采用标记整理

垃圾收集器

  • Serial

单线程收集器, 不只是说只会使用一个cpu或者一个线程去收集, 而是指GC时会暂停其他工作线程,但省去了线程切换的开销, 在client 模式下, heap是几百兆的情况, gc效率很高, 造成的停顿仅仅是几百毫秒, 是client端默认的GC收集器

  • ParNew(parallel 并行)

是Serial 的多线程版本, 作为新生代的默认收集器, 只能与CMS联合使用

  • Parallel Scavenage

新生代收集器, 注重控制吞吐量来控制GC的停顿时间, 虚拟机运行100分钟, GC一分钟, 吞吐量 99%
重要参数 : -XX:MaxGCPauseMills , -XX:GCTimeRatio, -XX:UseAdaptiveSizePolicy(自适应调节) 与ParNew的最大区别

  • Serial Old

是Serial 的老年代版本, 与Parallel Scavenage配合使用, 作为CMS的备案, 在发生concurrent mode failure使用

  • Parallel Old

是Parallel Scavenage的老年代版本, 可与Parallel Scavenage 配合使用

  • CMS(Concurrent Mark Sweep)

主要是将回收时间降至最短,基于标记-清除算法,

  • 缺点

    • 收集阶段, 占用一部分线程和cpu资源, 导致吞吐量下降
    • 无法处理浮动垃圾, 因为gc时用户线程继续产生新对象, 所以需要预留至少一部分空间用作gc时新对象的产生(-XX: CMSInitiatingOccupancyFraction 设置百分比), 若这部分空间不能满足, 则会导致concurrent mode failure, 导致启用备份的Serial Old收集器, 停顿会很长.
    • 采用标记清除算法, 内存碎片很多,-XX:UseCMSCompactionAtFullCollection(默认开启, 开启内存碎片的整理工作), 但是会导致停顿时间变长, 也可采用 -XX:CMSFullGCsBeforeCompaction(表示执行了多少次不压缩的full GC后, 来一次压缩的full GC ,默认是0, 表示每次都压缩).
  • Garbage First(G1)

    • 并行(多线程处理GC, 此时用户线程仍然等待)和并发(减少停顿时间, 和用户线程并行, 垃圾收集器在另一个CPU)
    • 分代收集
    • 空间整合
      整体使用标记-整理, 局部采用标记-复制,故不会有内存碎片.
    • 可预测的停顿

问题

  • jvm的高并发的并行程度如何计算, 基于cpu的核数? 还是? 最大并行程度受限于怎么样的硬件指标

java heap 分代(基于jdk1.8)

  • 新生代(PSYoungGen)

    eden space , survivor(from space , to space) ,可设置比例,采用标记复制算法

  • 老年代(ParOldGen)

采用标记清除算法或标记整理算法

  • Metaspace(方法区)

存储类的信息

GC回收过程

  • 对象优先在eden分配, 当eden没有足够空间时, 进行一次minorGC, 将存在GC root引用链的对象复制到survivor区域, 使用标记复制算法, 然后存活对象的年龄加一.
  • 大对象直接分配到老年代
    当对象大小大于参数设置的 -XX:PretenureSizeThreshold (默认是 0 , 即无论多大不会直接进入老年代)
  • 长期存活对象进入老年代
    当对象出生在eden, 经过一次minorGC进入survivor, 则对象年龄设为1, 当对象年龄超过该参数 -XX:MaxTenuringThreshold 默认15, 则进入老年代
  • 动态对象年龄判定
    当某个年龄的对象的大小总和超过survivor的一半, 则大于或等于该年龄的所有对象都会进入老年代.
  • 空间分配担保
    minorGC之前会检查老年代最大的连续内存空间是否大于新生代所有对象的大小, 或者检查最大的老年代的连续内存是否大于历史平均进入老年代的对象大小, 满足一个就进行minorGC, 否则fullGC

无论是Minor GC还是CMS GC,都会’Stop-The-World’,即停止用户的一切线程,只留下gc线程回收垃圾对象。其中Minor GC的STW时间主要耗费在复制阶段,CMS GC的STW时间主要耗费在标示垃圾对象阶段

GC调优案例

  • 将SurvivorRatio由默认的8改为2

使surivor的比例增大, eden: surivor1: surivor2的比例为 2:1:1, 增大了surivor, 减小了eden, 影响是: minorGC的频率增大, 因为eden小了; 增大了新生代对象复制的开销, 因为有更多的对象会留在 surivor区域, 但是提高了晋升老年代的门槛, 让新生代对象能进行充分的淘汰才能进入老年代, 使真正的长寿的对象才能进入老年代, 使fullGC的时间变短了.

  • NewParSize调优

NewParSize表示新生代大小, 增大新生代大小, 则单次minorGC时间变长, 频率下降, 业务读写操作时间抖动较大; 减小新生代大小, 就会使minorGC频率加快, 加快晋升到老年代的速度(因为每minorGC一次, 对象年龄加一), 增加fullGC的机会

hotSpot的算法实现

  • 什么时候开始GC
    当eden和一个survivor的空间容不下新的对象时,产生minorGC,将长期存活对象移到老年代, 若老年代的空间不够, 则进行fullGC
  • GC root
  1. local variable
  2. active java thread
  3. static variable
  4. JNI reference
  • 寻找GC Root的引用链

    • 逐个检测(包括方法区), 花费很多时间
    • GC 产生停顿, 保证引用关系不变
    • 解决办法: 减小时间, 类设置OOP map, 记录引用位置
  • 但是导致引用变化的指令可能非常多, 可能导致 OOP Map的所占空间巨大

    • 解决办法:在程序需要长时间执行的地方设置safepoint, 并生成OOP map, 程序跑到safepoint才GC, 并在safepoint设置标志, 跑到这个safepoint的时候判断有没有gc, 若线程处在block等状态, 在一段引用关系不变的代码段设置safeRegion, 随时可以GC

开发中的GC优化

  1. 尽量少使用临时对象, 局部变量尽量使用基本数据类型, 也可以避免装箱; 用StringBuffer, 不用string做累加.

StringBuffer是线程安全的; StringBuilder 不是线程安全的(所以内部没有一个缓存的数组), 适合单线程快速使用后丢弃,

  1. 对象不用时显式置为null
  2. 尽量少用静态对象变量

static变量被class 引用, class被classloader引用, 除非classloader is reloaded, 例如webapp reload, 否则static变量不会被垃圾回收.

// 如果只是想临时用一下static, 可以用static block, 在block结束之后, 就会被GC; 或者在static reference不使用之后, 显式赋为null
class MyUtils {
   static
   {
      MyObject myObject = new MyObject();
      doStuff(myObject, params);
   }

   static boolean doStuff(MyObject myObject, Params... params) {
       // do stuff with myObject and params...
   }
}

类加载机制

参考自 link

  • 类加载的步骤
  1. 加载

分为预加载和运行时加载,

预加载, 虚拟机启动的时候加载rt.jar的class, 像java.lang.、java.util.、java.io.*等等, 可以设置虚拟机参数 -XX+TraceClassLoading 来获取类加载信息

运行时加载: 在用到一个class文件的时候, 如果内存中没有则按类的全限定名来加载.

加载阶段:
> 1. 获取class文件的二进制流 , 例如从zip包中获取,这就是以后jar、ear、war格式的基础
从网络中获取,典型应用就是Applet
运行时计算生成,典型应用就是动态代理技术
由其他文件生成,典型应用就是JSP,即由JSP生成对应的.class文件
从数据库中读取,这种场景比较少见
> 2. 将类信息, 静态变量, 字节码, 常量等内容放到方法区
> 3. 内存中生成java.lang.Class的对象, 作为访问入口

  1. 验证

这个地方要说一点和开发者相关的。.class文件的第5~第8个字节表示的是该.class文件的主次版本号,验证的时候会对这4个字节做一个验证,高版本的JDK能向下兼容以前版本的.class文件,但不能运行以后的class文件(向后兼容),即使文件格式未发生任何变化,虚拟机也必须拒绝执行超过其版本号的.class文件。举个具体的例子,如果一段.java代码是在JDK1.6下编译的,那么JDK1.6、JDK1.7的环境能运行这个.java代码生成的.class文件,但是JDK1.5、JDK1.4乃更低的JDK版本是无法运行这个.java代码生成的.class文件的。如果运行,会抛出java.lang.UnsupportedClassVersionError,这个小细节,务必注意。

  1. 准备

为类变量(static 变量, 不是实例变量)分配内存并设置其初始值, 均在方法区分配

这个阶段赋初始值的变量指的是那些不被final修饰的static变量,比如”public static int value = 123;”,value在准备阶段过后是0而不是123,给value赋值为123的动作将在初始化阶段才进行;比如”public static final int value = 123;”就不一样了,在准备阶段,虚拟机就会给value赋值为123。

  1. 解析

将符号引用替换为直接引用的过程,

符号引用, 包括: 类和接口的全限定名; 字段的名称和描述符; 方法的名称和描述符

例如下面这串代码:

package com.xrq.test6;
 
public class TestMain
{
    private static int i;
    private double d;
     
    public static void print()
    {
         
    }
     
    private boolean trueOrFalse()
    {
        return false;
    }
}

用javap把这段代码的.class反编译一下:

Constant pool:
   #1 = Class              #2             //  com/xrq/test6/TestMain
   #2 = Utf8               com/xrq/test6/TestMain
   #3 = Class              #4             //  java/lang/Object
   #4 = Utf8               java/lang/Object
   #5 = Utf8               i
   #6 = Utf8               I
   #7 = Utf8               d
   #8 = Utf8               D
   #9 = Utf8               <init>
  #10 = Utf8               ()V
  #11 = Utf8               Code
  #12 = Methodref          #3.#13         //  java/lang/Object."<init>":()V
  #13 = NameAndType        #9:#10         //  "<init>":()V
  #14 = Utf8               LineNumberTable
  #15 = Utf8               LocalVariableTable
  #16 = Utf8               this
  #17 = Utf8               Lcom/xrq/test6/TestMain;
  #18 = Utf8               print
  #19 = Utf8               trueOrFalse
  #20 = Utf8               ()Z
  #21 = Utf8               SourceFile
  #22 = Utf8               TestMain.java

看到Constant Pool也就是常量池中有22项内容,其中带”Utf8″的就是符号引用。比如#2,它的值是”com/xrq/test6/TestMain”,表示的是这个类的全限定名;又比如#5为i,#6为I,它们是一对的,表示变量时Integer(int)类型的,名字叫做i;#6为D、#7为d也是一样,表示一个Double(double)类型的变量,名字为d;#18、#19表示的都是方法的名字。
那其实总而言之,符号引用和我们上面讲的是一样的,是对于类、变量、方法的描述。符号引用和虚拟机的内存布局是没有关系的,引用的目标未必已经加载到内存中了。

直接引用: 直接引用可以是直接指向目标的指针、相对偏移量或是一个能间接定位到目标的句柄。直接引用是和虚拟机实现的内存布局相关的,同一个符号引用在不同的虚拟机示例上翻译出来的直接引用一般不会相同。如果有了直接引用,那引用的目标必定已经存在在内存中了。

  1. 初始化

初始化过程是执行一个类的构造器()方法的过程, 其实就是给static变量赋予用户指定的值以及执行静态代码块, 虚拟机会保证类在多线程环境下正确的被初始化并同步, 在同一个类加载器下, 一个类只会初始化一次.

以下几种场景, 类会被正常初始化

1、使用new关键字实例化对象、读取或者设置一个类的静态字段(被final修饰的静态字段除外)、调用一个类的静态方法的时候

2、使用java.lang.reflect包中的方法对类进行反射调用的时候

3、初始化一个类,发现其父类还没有初始化过的时候

4、虚拟机启动的时候,虚拟机会先初始化用户指定的包含main()方法的那个类


  • 除了上面4种场景外,所有引用类的方式都不会触发类的初始化,称为被动引用,接下来看下被动引用的几个例子:

1、子类引用父类静态字段,不会导致子类初始化。至于子类是否被加载、验证了,前者可以通过”-XX:+TraceClassLoading”来查看

public class SuperClass
{
    public static int value = 123;
     
    static
    {
        System.out.println("SuperClass init");
    }
}
 
public class SubClass extends SuperClass
{
    static
    {
        System.out.println("SubClass init");
    }
}
 
public class TestMain
{
    public static void main(String[] args)
    {
        System.out.println(SubClass.value);
    }
}
运行结果为

SuperClass init

2、通过数组定义引用类,不会触发此类的初始化

public class SuperClass
{
    public static int value = 123;
     
    static
    {
        System.out.println("SuperClass init");
    }
}
 
public class TestMain
{
    public static void main(String[] args)
    {
        SuperClass[] scs = new SuperClass[10];
    }
}

3、引用静态常量时,常量在编译阶段会存入类的常量池中,本质上并没有直接引用到定义常量的类

public class ConstClass
{
    public static final String HELLOWORLD =  "Hello World";
     
    static
    {
        System.out.println("ConstCLass init");
    }
}
 
public class TestMain
{
    public static void main(String[] args)
    {
        System.out.println(ConstClass.HELLOWORLD);
    }
}
运行结果为
Hello World

在编译阶段通过常量传播优化,常量HELLOWORLD的值”Hello World”实际上已经存储到了NotInitialization类的常量池中,以后NotInitialization对常量ConstClass.HELLOWORLD的引用实际上都被转化为NotInitialization类对自身常量池的引用了。也就是说,实际上的NotInitialization的Class文件中并没有ConstClass类的符号引用入口,这两个类在编译成Class之后就不存在任何联系了。

  • 类与类的加载器
    只要当两个类来自同一个class文件,被同一个虚拟机加载,类加载器相同, equals(), isAssignableFrom(), instanceof 才能返回两个类相等.

  • 双亲委派模型(parents delegation model)
    当一个类加载器收到了类加载的请求, 首先把请求委派给父类加载器执行, 所以所以的加载请求都会首先传递到顶层的启动类加载器, 当父类无法加载时,子加载器才会尝试自己加载

    • 加载器的层次关系
      Bootstrap ClassLoader -> Extension ClassLoader -> Application ClassLoader -> User ClassLoader
  • classpath
    可以参考honghailiang888, 非常齐全

  • 如何手动编译并运行 java文件

    class文件发现规则:class文件所在目录 = classpath + '' + 包名中的'.'全变成'', 一般会把运行java, javac程序的当前目录(.)也加入到classpath中, 然后会遍历所有的classpath, 在每个classpath下面找 包名+类名 对应的class文件.

    • 编译java

    例如项目的目录结构如下:

    
    D:/src/main/java/
                  packageA/A.java
                  packageB/B.java

import packageB.B;

public class A{
    
    // do something
}

当需要在任意目录编译 A.java时, 需要知道所引用的B.java的位置, 假设运行javac的目录为D: , 因为当前目录是D:, 在当前目录下用包名无法找到B.java, 故需要手动指定额外的classpath, 则会在packageA和packageB生成各自的class文件.

    javac -classpath src/main/java src/main/java/packageA/A.java
    
  • 运行java
    也需要通过classpath 找到对应的class文件, 并且需要指定 包名.类名 , 项目结构如之前所示, 在D: 下运行java,
    java -classpath src/main/java packageA.A
    或者
    在linux 下面想跑一个class文件, 全限定类名为: packageA.packageB.A
    在需要将目录结构新建为: packageA/packageB/A.class
    在packageA的父目录执行 java packageA.packageB.A 即可, 若需要可以指定当前路径为 classpath, 命令如下: 
    java -classpath . packageA.packageB.A
  • GC 调优

http://hbasefly.com/2016/08/09/hbase-cms-gc/

jvm常用命令

  • -verbose:class , -verbose:gc ,-verbose:jni

https://dzone.com/articles/how-use-verbose-options-java
-verbose:class is used to display the information about classes being loaded by JVM. This is useful when using class loaders for loading classes dynamically or for analysing what all classes are getting loaded in a particular scenario.

问题

  1. spring是如何运行起来的, 并维持程序一直运行, 不结束

spring notes

  • spring的理解

    spring是以IOC和AOP为核心的众多组件的总称, 其他还包括spring-boot, spring-mvc等组件, 涵盖了后台开发的从前端到后台交互, 后台分层, 中间件, 数据库等持久层等几乎方位的支持.

spring IOC

  • 大致**
    首先spring的控制反转来源于依赖倒置(Dependency Inversion Principle )的**,
    传统的依赖顺序是高层依赖底层, 而依赖倒置指的是底层依赖高层. 例如车子的建造, 车子依赖车身, 车身依赖底盘, 底盘依赖车轮, 若需要改动车轮的规格, 则以上所有东西都需要改, 若反过来, 先定好车子的样子, 车身 -> 车子, 底盘 -> 车身, 轮子 -> 底盘, 要改车轮, 不会影响其他.

  • 概念关系
    依赖倒置是**, 控制反转是实现该**的一个思路, 依赖注入是该思路的具体实现方法, 依赖注入可以简要说是把底层类作为参数传递给上层类, 实现上层对下层的控制, 而IOC容器则封装了底层类的构造方法, 不需要知道底层类的实现细节以及 具体的构造函数的如何调用, 只需要按照依赖注入即可.


以下内容均参考 spring-framework-reference/core/beans-annotation-config

  • xml和annotation两种方式比较

    annotation使配置更简洁, 和源码放在一起, 更精确, 但是配置比较分散, 不在一个地方很难控制 ; xml的方式不需要去动源码, 所以不需要重新编译,

  • @required@Autowired 比较

    前者如果没有注入成功, 会报错, 且只能用于setter method.; 后者可以设置是否required(默认true) ,并且当无法autowire时会自动忽略, 适用于constructor, field , setter method or config method.

  • @Autowired

    按照byType注入, 可以设置required=false, 找不到bean时也不报错, 后续可能报NPE.

    • 用于constructor, 当构造函数有多个时, 可以将指定一个进行autowired, 否则将默认以贪婪模式(参数最多的构造函数进行构造).

    • 获取所有类型的bean

    It is also possible to provide all beans of a particular type from the ApplicationContext by adding the annotation to a field or method that expects an array of that type:

    public class MovieRecommender {
    
    @Autowired
    private MovieCatalog[] movieCatalogs;
    
    }
    
    public class MovieRecommender {
    
    private Set<MovieCatalog> movieCatalogs;
    
    @Autowired
    public void setMovieCatalogs(Set<MovieCatalog> movieCatalogs) {
        this.movieCatalogs = movieCatalogs;
    }
    
    // ...
    }
    
  • @qualifier

    可以结合@autowire使用, 当某个type有多个实例时, 可以用@qualifier 指定某个id的bean注入. @qualifier@Autowired结合使用时, 和@resource几乎等同.

  • @resource

    既可以byName, 也可以byType注入, 默认byName, 就是按照bean的id注入.

    • 装配顺序
    1. 如果同时指定了name和type,则从Spring上下文中找到唯一匹配的bean进行装配,找不到则抛出异常

    2. 如果指定了name,则从上下文中查找名称(id)匹配的bean进行装配,找不到则抛出异常

    3. 如果指定了type,则从上下文中找到类型匹配的唯一bean进行装配,找不到或者找到多个,都会抛出异常

    4. 如果既没有指定name,又没有指定type,则自动按照byName方式进行装配;如果没有匹配,则回退为一个原始类型进行匹配,如果匹配则自动装配;

    问题 在细看一下@qualifier

  • @component@repository@service@controller

    如果 Web 应用程序采用了经典的三层分层结构的话,最好在持久层、业务层和控制层分别采用上述注解对分层中的类进行注释。@service用于标注业务层组件.@controller用于标注控制层组件(如struts中的action).@repository用于标注数据访问组件,即DAO组件. @component泛指组件,当组件不好归类的时候,我们可以使用这个注解进行标注。

    • @bean@component的区别

      前者是显式的返回一个object, 然后注入到application context, 让spring代管理, 例如可以适用于第三方的lib, 无法再源码上@component; 后者是让spring创建并管理, 隐式的创建.

classpath*: / 与 classpath:/

  • classpath*:/ 表示在此classpath(当前项目的classpath及类路径的classpath)下的所有兄弟和子孙路径的 classpath下满足要求的所有文件
  • classpath:/ 表示classpath下的第一层子路径的第一个符合要求的文件

build project

  • 新建maven project ,选择archetype 为webapp或site,例如 module 名为 test,module file location 和content root 均为test路径

  • web项目下相对路径和绝对路径:

    • "." 代表目前所在的目录。
    • ".." 代表上一层目录。
    • "/" 代表根目录。

前后台传输数据

  1. 前台需要把对象通过JSON.stringfy转换json字符串,后台在参数对应加上@requestbody,并且Request的content-type必须为json,不然会出现415的错误.
  1. 后台传输少量数据到前端,避免新增类文件的做法:
Map<String,Object> map = new HashMap<String,Object>();
map.put("leader",ifLeader);
map.put("teamId",teamId);
jsonResult.setData(new JSONObject(map));
  1. 日期类参数
  • 如果controller的dto和后台数据映射dto为同一个,且日期类使用java.util.Date作为转换类型,则在该参数上加两个标签,省掉格式转换和非空判断,@DateTimeFormat为接受前台数据的转换,@JSONField为后台给前台返回数据的转换。
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JSONField(format="yyyy-MM-dd HH:mm:ss")
Date startTime
  • spring-servlet.xml需要加以下配置
<mvc:annotation-driven>
        <mvc:message-converters register-defaults="true">
        <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter">
            <property name="supportedMediaTypes" value="application/json"/>
            <!--设置fastjson特性-->
            <property name="features">
                <array>
                    <!--设置null值也要输出,fastjson默认是关闭的-->
                    <value>WriteMapNullValue</value>
                    <!--设置使用文本方式输出日期,fastjson默认是long-->
                    <value>WriteDateUseDateFormat</value>
                </array>
            </property>
        </bean>
        </mvc:message-converters>
    </mvc:annotation-driven>

mybatis返回map

mapper:

<resultMap id="teamResultMap" type="com.mucfc.result.QueryTeamResult">
    <id column="id" property="teamId" jdbcType="INTEGER" javaType="int" />
    <result column="name" property="teamName"  jdbcType="VARCHAR" javaType="String"/>
</resultMap>
<select id="queryTeam"  resultMap="teamResultMap"  parameterType="int" >
    select id,name  from tb_dept WHERE parent= #{level}
</select>

// daoMybatis:
public List<QueryTeamResult> queryTeam(int level){
    return selectList("WeeklyMapper.queryTeam",level);
}

jetty启动时修改资源文件,可以通过update更新

修改/${jetty_home}/etc/webdefault.xml 将true改为false
并在pom文件中配置

<init-param>
  <param-name>useFileMappedBuffer</param-name>
  <param-value>false</param-value>
</init-param>
pom.xml
<plugin>
  <groupId>org.eclipse.jetty</groupId>
  <artifactId>jetty-maven-plugin</artifactId>
  <configuration>
  <webAppConfig>
  <contextPath>/{project.build.finalName}</contextPath>
  </webAppConfig>
  <scanIntervalSeconds>3</scanIntervalSeconds>
  <httpConnector>8080</httpConnector>
  <webAppXml>jetty.xml</webAppXml>
  <webApp>
  <defaultsDescriptor>webdefault.xml</defaultsDescriptor>
  </webApp>
  </configuration>
</plugin>

httpServletResquest

  • getAttribute()

An attribute is a server variable that exists within a specified scope i.e.:
application: available for the life of the entire application
session: available for the life of the session
request: only available for the life of the request
page (JSP only): available for the current JSP page only

  • getParameter()

returns http request parameters. Those passed from the client to the server. For example http://example.com/servlet?parameter=1.
Can only return String

spring @value

  • static field cant inject directly.
private static String test;
@Value("${test.val}")
public void setTest(String a){
    test = a;
}
  • #{configProperties.configVal} , ${configVal} 区别
    link

  • 其他用法
    link

mybatis

  • 搞清楚mybatis ${var}和#{var}的区别

Boolean 值传递

数据库类型是tinyInt, dto字段类型是Boolean或boolean,

<if test="filedName != null">
 #{fieldName}
</if>
 // 不要加非空字符串判断

spring AOP

代理技术可以说是封装了切面类(功能加强类)的逻辑和目标类的逻辑, 解耦两者的代码, 让功能加强类的逻辑能无缝插入目标类, 不产生浸入式代码.
因为以下两种原生代理, 存在以下几个问题: 
1. 不能对目标类的某些方法进行选择后进行切面
2. 还是通过硬编码的方式在代理类中, 在目标方法前后进行切面
3. 对每个类都要创建代理类, 无法实现通用
因此spring aop应运而生. 
  • pointcut 切点

    指定在哪些类和哪些方法做切入

  • advice 增强

    定义加强类的逻辑, 方法前还是方法后等

  • advisor
    切面, 将pointcut和advice结合起来

  • JDK动态代理
    只能对接口或者实现类进行方法切面,
    问题
    为什么只能基于接口做代理.

  • CGLib代理

    采用动态创建子类的方法, 因此不能对final, private等方法进行切面; 而且相比于JDK性能比较高, 创建代理对象的时间比较长, 适用于spring 多数类都是单例的情况.

  • 示例代码

// maven dependency
<!-- AspectJ dependencies -->
		<dependency>
			<groupId>org.aspectj</groupId>
			<artifactId>aspectjrt</artifactId>
			<version>${aspectj.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.aspectj</groupId>
			<artifactId>aspectjtools</artifactId>
			<version>${aspectj.version}</version>
		</dependency>
		
		
// spring application.xml config
<!-- Enable AspectJ style of Spring AOP -->
<aop:aspectj-autoproxy  />
// 下面这种方式用于CGLIB, 可以切面类的非接口方法, 否则上面那种基于JDK的代理只能切接口方法
<aop:aspectj-autoproxy  proxy-ttarget-class="true" />

@Aspect
// 注意需要依赖注入
@Component
public class EmployeeAspect {

	// 注意方法()内有两个点, 代表任意参数
	@Pointcut("execution(* com.journaldev.spring.service.*.get*(..))")
	public void pointCut(){
		System.out.println("enter pointcut");
	}
	
	@Before("pointCut()")
	public void before(){
		
	}
}
  • 切面进入的条件
    public , non static , non final ,called by class outside(非必须)
    若要在类的内部调用中执行切面, 例如类里面有两个方法A, B, 在A调用B的时候进入切面, 则需要如下写法:
    // 注意该类需要由spring管理,并生成, 若是在外部 Test t = new Test(); t.methodA();或者 t.methodB(); 均不能进入切面
    @Component
    public class Test{
    @Resource Test test
        public void methodA(){
            // something
            test.methodB();
            // something
        }
        
        public void methodB(){}
        
    }

spring事务


  • 数据并发问题
  • 脏读(dirty read), A事务读到了B事务尚未提交的数据, 可理解为读到了脏数据, 若此时B事务回滚, 则会产生数据不一致的情况.
Transaction 1	                                          Transaction 2
/* Query 1 */
SELECT age FROM users WHERE id = 1;
/* will read 20 */
                                                        /* Query 2 */
                                                        UPDATE users SET age = 21 WHERE id = 1;
                                                        /* No commit here */
/* Query 1 */
SELECT age FROM users WHERE id = 1;
/* will read 21 */
                                                        ROLLBACK; /* lock-based DIRTY READ */

  • 不可重复读(unrepeatable read)
    因为select可能不需要锁, 或者发生在两次select锁获取之前, 就会读到不可重复读取的数据, 根据不同的隔离级别, 会有不同的数据.

At the SERIALIZABLE and REPEATABLE READ isolation levels, the DBMS must return the old value for the second SELECT. At READ COMMITTED and READ UNCOMMITTED, the DBMS may return the updated value; this is a non-repeatable read.

  • 有两种策略来处理此种情况
    1. 延迟事务2的执行直到事务1提交或者回滚, 等于说维护一个时序: serial schedule T1, T2.
    2. 为了获取更好的性能,让事务2被先提交, 事务1的执行必须满足时序 T1, T2, 若不满足事务1被回滚.

在不同模型下, 结果会不一样

Using a lock-based concurrency control method, at the REPEATABLE READ isolation mode, the row with ID = 1 would be locked, thus blocking Query 2 until the first transaction was committed or rolled back. In READ COMMITTED mode, the second time Query 1 was executed, the age would have changed.

Under multiversion concurrency control, at the SERIALIZABLE isolation level, both SELECT queries see a snapshot of the database taken at the start of Transaction 1. Therefore, they return the same data. However, if Transaction 1 then attempted to UPDATE that row as well, a serialization failure would occur and Transaction 1 would be forced to roll back.

At the READ COMMITTED isolation level, each query sees a snapshot of the database taken at the start of each query. Therefore, they each see different data for the updated row. No serialization failure is possible in this mode (because no promise of serializability is made), and Transaction 1 will not have to be retried.(这段怎么跟之前说的 read commited 不一样 ? )

  • 幻象读(Phantom reads),是不可重复读的特例, 相当于是范围读.
    Transaction 1	                                        Transaction 2
/* Query 1 */
SELECT * FROM users
WHERE age BETWEEN 10 AND 30;
                                                /* Query 2 */
                                                INSERT INTO users(id,name,age) VALUES ( 3, 'Bob', 27 );
                                                COMMIT;
/* Query 1 */
SELECT * FROM users
WHERE age BETWEEN 10 AND 30;
COMMIT;


Note that Transaction 1 executed the same query twice. If the highest level of isolation were maintained, the same set of rows should be returned both times, and indeed that is what is mandated to occur in a database operating at the SQL SERIALIZABLE isolation level. However, at the lesser isolation levels, a different set of rows may be returned the second time.

In the SERIALIZABLE isolation mode, Query 1 would result in all records with age in the range 10 to 30 being locked, thus Query 2 would block until the first transaction was committed. In REPEATABLE READ mode, the range would not be locked, allowing the record to be inserted and the second execution of Query 1 to include the new row in its results.


Transaction 1	                                    Transaction 2
/* Query 1 */
SELECT * FROM users WHERE id = 1;
                                                    /* Query 2 */
                                                    UPDATE users SET age = 21 WHERE id = 1;
                                                    COMMIT; 
                                                    /* in multiversion concurrency
                                                    control, or lock-based READ COMMITTED */
/* Query 1 */
SELECT * FROM users WHERE id = 1;
COMMIT; 
/* lock-based REPEATABLE READ */

Isolation level		Dirty reads	    Non-repeatable reads	            Phantoms
Read Uncommitted	may occur	    may occur(1)	                    may occur(2)
Read Committed		don't occur	    may occur(1)	                    may occur(2)
Repeatable Read		don't occur	    don't occur	                        may occur(2)
Serializable		don't occur	    don't occur	                        don't occur

(1): 返回修改后的值
(2): 会返回新插入的值
Serializable : 均会阻塞事务2的更新, 直到事务1提交
  • mysql的锁
  • spring的事务传播级别, 实际上就是多个方法调用, 发生多个事务交叉时, 如何选择当一个事务提交或者回滚后对另一个事务的影响
  • mybatis, 如果mapper不用interface, 用class可以注入吗

(转)jvm outOfMemory troubleshoot skills

赞一个, 方法很多, 转自 "Axb的自我修养"
2.现象
线上机器部署了两个java实例,在运行几天后java开始吃swap空间,java实例的内存占用接近7G,程序响应很慢,重启后又恢复正常。线上配置的堆内存为3600M,栈大小为512k。

3.排查
首先怀疑是java heap的问题,查看heap占用内存,没有什么特殊。
jmap -heap pid
然后又怀疑是directbuffer的问题,jdk1.7之后对directbuffer监控的支持变得简单了一些,使用如下脚本:

`
import java.io.File;
import java.util.;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.
;

import com.sun.tools.attach.VirtualMachine; // Attach API

/**

  • Simple tool to attach to running VM to report buffer pool usage.
    */

public class MonBuffers {
static final String CONNECTOR_ADDRESS =
"com.sun.management.jmxremote.localConnectorAddress";

public static void main(String args[]) throws Exception {
    // attach to target VM to get connector address
    VirtualMachine vm = VirtualMachine.attach(args[0]);
    String connectorAddress = vm.getAgentProperties().getProperty(CONNECTOR_ADDRESS);
    if (connectorAddress == null) {
        // start management agent
        String agent = vm.getSystemProperties().getProperty("java.home") +
                File.separator + "lib" + File.separator + "management-agent.jar";
        vm.loadAgent(agent);
        connectorAddress = vm.getAgentProperties().getProperty(CONNECTOR_ADDRESS);
        assert connectorAddress != null;
    }

    // connect to agent
    JMXServiceURL url = new JMXServiceURL(connectorAddress);
    JMXConnector c = JMXConnectorFactory.connect(url);
    MBeanServerConnection server = c.getMBeanServerConnection();

    // get the list of pools
    Set<ObjectName> mbeans = server.queryNames(
        new ObjectName("java.nio:type=BufferPool,*"), null);
    List<BufferPoolMXBean> pools = new ArrayList<BufferPoolMXBean>();
    for (ObjectName name: mbeans) {
        BufferPoolMXBean pool = ManagementFactory
            .newPlatformMXBeanProxy(server, name.toString(), BufferPoolMXBean.class);
        pools.add(pool);
    }

    // print headers
    for (BufferPoolMXBean pool: pools)
        System.out.format("         %8s             ", pool.getName());
    System.out.println();
    for (int i=0; i<pools.size(); i++)
        System.out.format("%6s %10s %10s  ",  "Count", "Capacity", "Memory");
    System.out.println();

    // poll and print usage
    for (;;) {
        for (BufferPoolMXBean pool: pools) {
            System.out.format("%6d %10d %10d  ",
                pool.getCount(), pool.getTotalCapacity(), pool.getMemoryUsed());
        }
        System.out.println();
        Thread.sleep(2000);
    }
}

}

`

查看java线程的情况,虽然线程数很多,但是内存增长时线程数基本没有什么变化。

$ jstack pid |grep 'java.lang.Thread.State' |wc -l

或者
$ cat /proc/pid/status |grep Thread

对java做了一次heap dump,使用eclipse的MAT查看堆内使用情况,没有发现明显有哪个对象数量有明显异常,heap的大小也只有几百兆。

$ jmap -dump:file=/tmp/heap.bin

发现stack dump里的global jni reference一直在增长,怀疑是jni调用存在内存溢出。

$ jstack pid |grep JNI

查找了jar包里的.so/.h等c文件,发现jruby、jthon等jar包里有jni相关的文件。

$ wtool jarfind *.so .

上网发现确实有不少jruby内存溢出的issue。把这些jar包直接删掉之后观察global jni reference数量还是在涨,内存增长情况也没有改善。

之后突然想到full gc的问题,对增长中的java进程做了一次full gc,global jni reference数量由几千个下降到几十个,但是占用内存还是没有变化,排除掉global reference的可能性。

用pmap查看进程内的内存情况,发现java的heap和stack大小都没啥变化,但是定期会多出来一个64M左右的内存块。
image

使用gdb观察内存块里的内容,发现里面有一些接口的返回值、mc的返回值、还有一些类名等等

gdb: dump memory /tmp/memory.bin 0x7f6b38000000 0x7f6b38000000+65535000

$ hexdump -C /tmp/memory.bin

$ strings /tmp/memory.bin |less
image
上网搜索后发现有人遇到过这个问题,在这个网页里有ibm对64M问题的研究。依照网站上说的办法,把MALLOC_ARENA_MAX参数调成1,发现virtual memory正常了,res也小了1G左右。同时hadoop的issue里也有一些性能方面的测试,发现MALLOC_ARENA_MAX=4的时候性能会提升,但是他们也说不清楚为什么。
修改之后程序启动时的virtual memory明显降低,res也降低到了3.2g:
image
本来以为到这里应该算是解决了,但是这个程序跑了几天之后内存依然在上涨,只是内存块由很多64M变成了一个2g+的普通native heap。

继续寻找线索,在一些关于MALLOC_ARENA_MAX这个参数的讨论里也发现一些关于glibc的其它参数。比如M_TRIM_THRESHOLD和M_MMAP_THRESHOLD或者MALLOC_MMAP_MAX_,试用之后发现依然没有效果。

试着从glibc的malloc实现上找问题,比如这里和这里,同样没有什么进展。

尝试用strace和ltrace查找malloc调用,发现定期有32k的内存申请,但是无法确定是从哪调用的。

尝试用valgrind查找内存泄露,但是jvm跑在valgrind上几分钟就crash了。

在网上查到了一个关于thread pool用法错误有可能导致内存溢出的问题,可以写一个小程序重现:
`
public static void main(String[] args) throws InterruptedException {
Logger.getRootLogger().setLevel(org.apache.log4j.Level.ERROR);
while(true) {
long max = Runtime.getRuntime().maxMemory();
long total = Runtime.getRuntime().totalMemory();
long free= Runtime.getRuntime().freeMemory();
System.out.println("max:"+max+",total:"+total+",free:"+free);
for (int i = 0; i < 10000; i++) {
ThreadPoolExecutor executor = new ThreadPoolExecutor();
}
Thread.sleep(1000);
}
}

`
但是用btrace挂了一天也没有发现有错误的调用,源代码里也没找到类似的用法。

重新用MAT在heap dump里查找是否有native reference,发现finalizer队列里有很多java.util.zip.Deflater的实例,上网搜索发现这个类有可能导致native内存溢出,使用的jesery框架里有这个问题导致gzip异常的issue;用btrace监视发现有大量这个类的构造函数被调用,但是经过几次full gc的观察,每次full gc后finalizer队列里的Deflater数量都会减少到个位数,但是内存依然在上涨;同时排查了线上配置,发现没有开启gzip。

也发现了有人说SunPKCS11有可能导致内存泄露,但是也没发现有相关java对象。

尝试把Xss参数调到256k,运行几天后发现内存维持在5.7g左右,比较稳定,但是从各种角度都无法解释为什么xss调小会影响native heap的大小。

怀疑是JIT的问题,用-Xint或者-XX:-Inline方式启动之后发现内存依然增长。

本来排查到这里已经绝望了,但是最后想到是不是JDK本身有什么bug?

查看jdk的changelog,发现线上使用的1.7-b15的版本比较老,之后有一些对native memory leak的修复。尝试用新的jdk1.7-u71启动应用,内存竟然稳定下来了!

在升级jdk、限制directbuffer大小为256M、调整MALLOC_ARENA_MAX=1后,4倍流量的tcpcopy运行几天后内存占用稳定在5G;只升级了jdk,其它参数不变,运行一天后内存为5.4G,是否上涨还有待观察。对比之前占用6.8G左右,效果还是比较明显的

  1. 其它参考资料
    Linux Native Memory issues for WebSphere Application Server
    vmoptions

concurrence in practice

java多线程体系图

link

基本概念

什么叫线程安全

多线程调用一个对象时, 不需要考虑线程的交替执行,也不需要额外的同步,调用这个对象都能获得正确的结果; 可以简单分为两个方面,
1. 多线程环境下代码的调用顺序
可以依据happen-before原则保证代码执行的顺序如你所愿, 不被重排序
2. 存储的可见性
因为cpu有多核心和对应的多级缓存, 不加额外的同步会导致A核心对应的缓存内容对 B核心的线程不可见.

线程安全的实现方法

  • 互斥同步

synchronized, wait notify, reentrantLock, 阻塞队列

  • 非阻塞同步

线程自旋, 使用CAS保证原子

  • 无同步方案

对象不共享(局部变量); 线程本地变量(threadLocal); 不可变对象(final)

线程本地变量(threadLocal)

每个thread里面持有一个私有的threadLocalMap, map里有个entry数组, entry持有一个ThreadLocal的弱引用,引用关系如下图所示, 虚线表示弱引用
image
若不存在对threadLocal 的强引用, 则entry会被回收, 变成null, 但是entry中的value未被回收, 若当前线程不结束, 则保持有一条这样的引用链: thread ref -> thread -> threadLocalMap -> entry -> val,

  1. 为什么弱引用回收后, entry变null, 到底什么是弱引用; entry变为null之后, val为何不会被回收,就是为什么还存在 : entry -> val
  • 使用方法
  1. 使用者需要手动调用remove函数,删除不再使用的ThreadLocal.若后续线程没有结束, 但是却没有使用get, set或remove , 则已经为null的entry对应的value没有释放, 会造成内存泄漏
  2. 还有尽量将ThreadLocal设置成private static的,这样ThreadLocal会尽量和线程本身一起消亡。
        private void set(ThreadLocal<?> key, Object value) {

            // We don't use a fast path as with get() because it is at
            // least as common to use set() to create new entries as
            // it is to replace existing ones, in which case, a fast
            // path would fail more often than not.

            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);

            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();

                if (k == key) {
                    e.value = value;
                    return;
                }

                if (k == null) {// 这段表明key已经被GC, 因为是弱引用
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }
        

问题

  • 发现entry的key, 即threadLocal, 被GC后, 会对无用的entry进行资源释放(对value和entry进行置空), 并再次rehash, 这过程没看懂, 感觉都是为了释放内存, 防止内存溢出

不可变对象(final)

只能初始化一次,不能对原始变量进行赋值或修改对象的引用(可以修改引用的属性).对有可能需要线程安全的变量var,若需要声明为static,则需要为private,且对外部调用该var的方法实现同步,加volatile或synchronize;
否则若是public,则需要声明为final,防止外部修改,这样就不能进行变量的修改操作了

锁理论

参见 link

自旋锁

  • 简单自旋锁
    线程a占用锁的时候, 线程b此时不能获取, 会等待一段时间, 跟互斥锁相反
public class TASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (state.getAndSet(true)) {
            ;
        }
    }

    public void unlock() {
        state.set(false);
    }
}

// 更高性能的写法
public class TTASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (true) {
            while (state.get()) {
                ;
            }
            if (! state.getAndSet(true)) {
                return;
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}
// 简而言之, 相比第一种写法避免了其他等待线程一直在写, 导致缓存和主存间不停的数据交换, 也占用了总线, 直接表现就是释放锁和获取锁都变慢, 
但是第二种写法 持有锁的线程在释放锁的时候, 会引起其他线程竞争, 造成总线流量暴增, 难以获取到锁.
  • 自旋锁和互斥锁的区别
    自旋锁在等待时会占用cpu时间片, 适用于线程切换开销大于等待的情况; 互斥锁在等待时会被阻塞, 不占用cpu时间片,但是cpu切换上下文会消耗额外资源, 适用于线程等待时间大于线程切换的情况
  • 指数后退锁
public class BackoffLock {
    private AtomicBoolean state = new AtomicBoolean(false);
    private static final int MIN_DELAY = 10;
    private static final int MAX_DELAY = 100;

    public void lock() {
        Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
        while (true) {
            while (state.get()) {
                ;
            }
            if (! state.getAndSet(true)) {
                return;
            } else {
                backoff.backoff();
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}

class Backoff {
    private final int minDelay, maxDelay;
    int limit;
    final Random random;

    public Backoff(int min, int max) {
        minDelay = min;
        maxDelay = max;
        limit = minDelay;
        random = new Random();
    }

    public void backoff() {
        int delay = random.nextInt(limit);
        limit = Math.min(maxDelay, 2 * limit);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            ;
        }
    }
}
  • 排队自旋锁(基于数组的锁)
    每个线程按照到来的先后顺序进行排队, 每个时刻只能由一个线程获取锁, 其他线程在非阻塞等待, 当持有锁的线程释放时, 排下个位置的线程才能获取到锁.
public class ALock {
    ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>();
    AtomicInteger tail;
    volatile boolean [] flag;
    int size;

    public ALock() {
        size = 100;
        tail = new AtomicInteger(0);
        flag = new boolean[size];
        flag[0] = true;
    }

    public void lock() {
        int slot = tail.getAndIncrement() % size;
        mySlotIndex.set(slot);
        while (! flag[slot]) {
            ;
        }
    }

    public void unlock() {
        int slot = mySlotIndex.get();
        flag[slot] = false;
        flag[(slot + 1) % size] = true;
    }
}

问题
CLH队列锁, MCS锁, 遗留

CAS(Compare and set)操作

如果当前值和期望值相等, 则才赋值, 等于说是一种非阻塞的多线程赋值操作.但是会带来cache一致性流量问题, 导致多组cache在同步内存时,导致总线流量增加, cpu使用率增加, 线程竞争的问题.

缓存一致性协议

博文
cpu具有多级缓存, cpu只能逐层找数据, 例如从一级缓存找二级, 最后找到主存

基本定律:在任意时刻,任意级别缓存中的缓存段的内容,等同于它对应的内存中的内容。

回写定律:当所有的脏段被回写后,任意级别缓存中的缓存段的内容,等同于它对应的内存中的内容。

  • 缓存段
    每个缓存段对应一段物理内存

当我提到“缓存段”的时候,我就是指一段和缓存大小对齐的内存,不关心里面的内容是否真正被缓存进去(就是说保存在任何级别的缓存中)了。

缓存一致性协议有多种,但是你日常处理的大多数计算机设备使用的都属于“窥探(snooping)”协议

  • 窥探(snooping)
    因为所有的内存传输都在总线(bus)上,所有的处理器都能看到总线; 同一个指令周期只有一个缓存能够读写内存;
    当一个缓存去读写内存的时候, 其他处理器都会得到通知, 使得自己缓存所对应的段失效.

  • MESI以及衍生协议

    • 失效(Invalid)缓存段,要么已经不在缓存中,要么它的内容已经过时。为了达到缓存的目的,这种状态的段将会被忽略。一旦缓存段被标记为失效,那效果就等同于它从来没被加载到缓存中。
    • 共享(Shared)缓存段,它是和主内存内容保持一致的一份拷贝,在这种状态下的缓存段只能被读取,不能被写入。多组缓存可以同时拥有针对同一内存地址的共享缓存段,这就是名称的由来。
    • 独占(Exclusive)缓存段,和S状态一样,也是和主内存内容保持一致的一份拷贝。区别在于,如果一个处理器持有了某个E状态的缓存段,那其他处理器就不能同时持有它,所以叫“独占”。这意味着,如果其他处理器原本也持有同一缓存段,那么它会马上变成“失效”状态。
    • 已修改(Modified)缓存段,属于脏段,它们已经被所属的处理器修改了。如果一个段处于已修改状态,那么它在其他处理器缓存中的拷贝马上会变成失效状态,这个规律和E状态一样。此外,已修改缓存段如果被丢弃或标记为失效,那么先要把它的内容回写到内存中——这和回写模式下常规的脏段处理方式一样。

第一,在多核系统中,读取某个缓存段,实际上会牵涉到和其他处理器的通讯,并且可能导致它们发生内存传输。写某个缓存段需要多个步骤:在你写任何东西之前,你首先要获得独占权,以及所请求的缓存段的当前内容的拷贝(所谓的“带权限获取的读(Read For Ownership)”请求)。

第二,尽管我们为了一致性问题做了额外的工作,但是最终结果还是非常有保证的。即它遵守以下定理,我称之为:
MESI定律:在所有的脏缓存段(M状态)被回写后,任意缓存级别的所有缓存段中的内容,和它们对应的内存中的内容一致。此外,在任意时刻,当某个位置的内存被一个处理器加载入独占缓存段时(E状态),那它就不会再出现在其他任何处理器的缓存中。

原生同步

同步代码块

  • synchronized

    • 可以将需要线程安全的变量保存在一个object,通过object 的 intrinsic lock 去保证,而且线程安全的所有变量都需要用一个lock,而不是多个
    • 三种用法, 推荐使用第三种, 因为前两种等于直接暴露了锁对象, 锁对象是实例, 任何其他可以获得该实例的外部代码, 都有可能恶意获取该锁, 并锁住; 锁方法和锁实例等于是直接锁住了该对象, 其他线程想要获取其他不需要同步的方法也会阻塞.
    // 第一种:直接锁方法
    public synchronized void print(){
        ...//逻辑代码
    }
    // 第二种:锁方法中的代码块
    public void print(){
        synchronized(this){
                ...//需要同步的代码
        }  
             ...//非同步代码
    }
    第三种方法:单独创建锁对象
    private final SynObj=new Object();
    public void print(){
        synchronized(SynObj){
                ...//需要同步的代码
        }  
             ...//非同步代码
    }
    
    
    
    • synchronized 与 reentrantlock 的区别
      • 后者更灵活, 可以跨方法解锁, 可以实现公平锁和非公平, 和中断获取锁
      • 原理不同, 前者基于java原生的互斥锁, 线程阻塞; 后者基于CAS
  • wait, notify, notifyAll

    • wait
      调用之前需要啊占用对象锁, 否则会抛出 IllegalMonitorStateException , 调用后会释放对象的monitor lock. 直到其他线程调用notify / notifyAll 或者超过指定的时间;

    • wait 与LockSupport, sleep 的区别
      LockSupport不需要获取到 monitor lock , sleep调用后不会释放monitor lock

    • notify
      唤醒任意一个正在等待object monitor的线程, 当前线程必须持有monitor , 否则会抛出 IllegalMonitorStateException, 其他在等待monitor的线程会发生竞争

    • notifyAll
      与notify的区别是, 会唤醒所有等待线程, 但是只有一个线程可以竞争到monitor, 其他线程在等待释放monitor

其他线程方法

join, yield, sleep

  • sleep

    不释放正在占用的对象的monitor lock,线程处于block的状态

  • join

    在主线程中调用threadA.join(), 主线程需要等待threadA 死亡才会继续执行

  • yield

    把当前cpu的使用交给调度器进行分配, 调度器可以忽略该信息. 细节见
    link

阻塞和可中断方法

当一个方法能够抛出InterruptedException,侧面说明该方法是可以被阻塞的。如果该方法被中断,会提前结束阻塞状态。


// 第一个方法是static,针对当前线程;第二个方法是针对调用线程的;当参数是true的情况,会清除interrupted标志位(意味着isInterrupted 返回false),interrupted仅仅是个标志位,不影响线程的状态。



 public static boolean interrupted() {
        return currentThread().isInterrupted(true);
    }

public boolean isInterrupted() {
        return isInterrupted(false);
    }
    
 public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

// 第三个方法会给当前线程发送interruptEx, 并不会让 interrupted 标志位变为true,所以需要中断线程的话需要如下写法: 

   public static void main(String a[]) {
        try {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    while (!Thread.currentThread().isInterrupted()) {
                        System.out.println("running");
                        try {
                            sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            Thread.currentThread().interrupt();
                        }
                    }
                    System.out.println("thread is going to die.");

                }
            });
            thread.start();
            sleep(5000);
            thread.interrupt();
            thread.join();// 等待thread 线程结束
            System.out.println("thread terminated");
            sleep(1000);
            System.out.println("main thread terminated");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

处理不可中断线程

问题

  • IO不可中断?

volatile

  • The value of this variable will never be cached thread-locally: all reads and writes will go straight to "main memory",and read and write is atomic,means that when write is begined,then read must wait write end,so volatile i++ is not ensured 一致性.

  • Access to the variable acts as though it is enclosed in a synchronized block, synchronized on itself.

  • This can be useful for some actions where it is simply required that visibility of the variable be correct and order of accesses is not important. Using volatile also changes treatment of long and double to require accesses to them to be atomic.

  • Happens-Before Guarantee:

    在写volatile变量之前的所有写nonVolatile变量的操作会被写入到main memory,在读volatile变量之后的所有读nonVolitale的操作都会从main memory中读.

       Thread A:
       sharedObject.nonVolatile = 123;
       sharedObject.counter     = sharedObject.counter + 1;
    
       Thread B:
       int counter     = sharedObject.counter;
       int nonVolatile = sharedObject.nonVolatile;
    

    so the nonVolatile is also be ensured read and write compatibility.this can also used for ensuring instructions order 一致性.

  • atomic problem : getter, setter and change reference

JLS 17.7

  • Non-Atomic Treatment of double and long

For the purposes of the Java programming language memory model, a single write
to a non-volatile long or double value is treated as two separate writes: one to each
32-bit half. This can result in a situation where a thread sees the first 32 bits of a
64-bit value from one write, and the second 32 bits from another write.
Writes and reads of volatile long and double values are always atomic.
Writes to and reads of references are always atomic, regardless of whether they are
implemented as 32-bit or 64-bit values.
but to ensure 一致性, declare variable volatile.

原生锁优化(jdk1.6 之后)

public void demoMethod(){  
		synchronized(lock){   
			//do sth.  
		}  
		//做其他不需要的同步的工作,但能很快执行完毕  
		synchronized(lock){   
			//do sth.  
		} 
	}
这种情况,根据锁粗化的**,应该合并
public void demoMethod(){  
		//整合成一次锁请求 
		synchronized(lock){   
			//do sth.   
			//做其他不需要的同步的工作,但能很快执行完毕  
		}
	}
	
  • 锁清除(Lock Elimination)

    锁清除是编译器级别的配置. 因为有些时候在线程安全的场景下使用某些内部做了额外同步的类(例如局部变量中使用到了内部加锁的对象, StringBuffer, Vector), 就可以启用JVM配置: 开启逃逸分析和锁清除,

    -server -XX:+DoEscapeAnalysis -XX:+EliminateLocks
    
    • 开启逃逸分析, 例如sb这个对象有可能被在程序的其他地方被修改而可能导致线程安全问题, 若编译器分析该对象没有出现逃逸情况, 就可以把多余的锁去掉以提高性能
    public static StringBuffer craeteStringBuffer(String s1, String s2) {
    	StringBuffer sb = new StringBuffer();
    	sb.append(s1);
    	sb.append(s2);
    	return sb;
    }
    
    当JVM参数为:
    -server -XX:+DoEscapeAnalysis -XX:+EliminateLocks
    输出:
    craeteStringBuffer: 302 ms
    JVM参数为:
    -server -XX:+DoEscapeAnalysis -XX:-EliminateLocks
    输出:
    craeteStringBuffer: 660 ms
    
  • 偏向锁(Biased Locking)

    是指之前已经获取到锁的线程再次请求锁时, 不需再次申请锁, 可以直接进入同步块, 适用于无竞争的情况, 在竞争激烈情况下会加重系统负担, 因为每次都要判断该线程是否偏向; 当其他线程请求相同的锁时, 之前获取锁的线程的不具备偏向锁性质.


-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0

  • 轻量级锁(Lightweight Locking)
    目的是减少线程互斥的几率, 使用CAS操作来加锁, 若轻量级锁加锁失败, 使用重量级锁.

  • 自旋锁(Adaptive Spinning)
    让线程进入循环等待锁, 这段时间仍然占用cpu时间, 也是为降低切换上下文带来的开销, 可以用参数-XX:+UseSpinning
    , -XX:PreBlockSpin

  • 重量级锁(操作系统层级)

JUC多线程支持体系

Executor

  • 大致流程

    每个新任务到来都会创建一个worker执行任务(不管是否有其他核心线程空闲),如果已经超过核心线程的数量,会把任务放进队列排队, worker继承自AQS 实现了Runnable, 继承自AQS表示worker本身就是一个锁, 且该锁是独占锁, 不可重入, 若被占用表示当前worker正在执行task, 若不被占用表示当前worker空闲.

  • 核心线程数

  • 队列

    • ArrayBlockingQueue

      基于数组的有界阻塞队列.

    • 大致流程
      先获取到ReentrantLock lock, 若满足await的条件, 然后await, 进入条件队列(condition queue), 释放之前的lock, 并阻塞, 等待其他线程调用signal 唤醒,唤醒了之后再次进入阻塞队列(sync queue)等待获取互斥锁.

    • 读写互斥,读读互斥,写写互斥.

    // ArrayBlockingQueue 的功能与下列功能类似, 实际生产中可以直接使用 ArrayBlockingQueue
    
    static class BoundedBuffer {
		final Lock lock = new ReentrantLock();
		final Condition notFull = lock.newCondition();
		final Condition notEmpty = lock.newCondition();

		final Object[] items = new Object[100];
		int putptr, takeptr, count;

		public void put(Object x) throws InterruptedException {
			System .out.println("put wait lock");
			lock.lock();
			System.out.println("put get lock");
			try {
				while (count == items.length) {
					System.out.println("buffer full, please wait");
					notFull.await();
				}
					
				items[putptr] = x;
				if (++putptr == items.length)
					putptr = 0;
				++count;
				notEmpty.signal();
			} finally {
				lock.unlock();
			}
		}




		public Object take() throws InterruptedException {
			System.out.println("take wait lock");
			lock.lock();
			System.out.println("take get lock");
			try {
				while (count == 0) {
					System.out.println("no elements, please wait");
					notEmpty.await();
				}
				Object x = items[takeptr];
				if (++takeptr == items.length)
					takeptr = 0;
				--count;
				notFull.signal();
				return x;
			} finally {
				lock.unlock();
			}
		}
	}
	
	
    
  • LinkedBlockingQueue

    基于链表的无界阻塞队列,吞吐量高于 ArrayBlockingQueue , Executors.newFixedThreadPool()使用了这个队列

  • SynchronousQueue

    不存储元素的阻塞队列, 新任务必须等待有线程空闲,否则一直阻塞. 吞吐量高于 LinkedBlockingQueue, Executors.newCachedThreadPool默认使用这个队列

  • PriorityBlockingQueue

一个具有优先级的无限阻塞队列
* 问题总结
可能出现以下异常 futuretaskcastex
futuretaskcastex

解决办法如下, 新写一个futureTask, 并重载threadPool的newTaskFor() 方法.
overridefuturetask

  • 最大线程数

    当队列满了, 才会创建多的线程.

  • 线程保持活跃时间

    当worker数量大于核心线程的数量, 并且空闲时间大于keepAliveTime之后, 线程会被终止

  • 饱和策略(RejectedExecutionHandler)

    当队列和线程池都满的时候, 需要用策略去处理到来的新的任务.

    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程(可以说是调用线程池的主线程)来运行任务。
    • DiscardOldestPolicy:丢弃队列里最老的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。
      当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
  • 线程工厂方法

    用于设置创建线程的工厂,统一管理线程池初始化线程.

  • FutureTask

    • Runnable
      不可返回结果和抛出checked exception

    • Callable
      可返回结果和抛出checked exception

    • Future
      是一个接口,Future就是对于具体的Runnable或者Callable任务的执行结果进行
      取消、查询是否完成、获取结果、设置结果操作

    • FutureTask

      FutureTask implement RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {  
    /** 
     * Sets this Future to the result of its computation 
     * unless it has been cancelled. 
     */  
    void run();  
}  

Collections

  • queue(BlockingQueue)

    大致流程: 读写线程先抢占进入阻塞队列(sync queue), 获取到互斥锁后, 读线程和写线程分别进入一个条件队列, 并若队列里元素为空,则读阻塞,直到新元素写入成功, 通知读队列的第一个线程,该线程进入阻塞队列排队, 直到获取到锁. 写阻塞同理;

    • LinkedBlockingQueue

    • ArrayBlockingQueue

    • ConcurrentLinkedQueue

      线程安全的非阻塞队列,

  • CopyOnWriteArrayList, CopyOnWriteArraySet
    每次写数组的时候,都会用ReentrantLock 锁住,然后生成一个新的数组进行拷贝, 只保证写是线程安全的,不保证写立马被读可见, 因为读的时候没有加锁. 适用于读多写少的场景.

  • ConcurrenceHashMap

    jdk1.8之前, 采用分段锁机制, 默认分16个Segment , Segment继承自ReentrantLock, 对每个段的table进行线程间的同步.
    从1.8开始,采用CAS算法,多个线程同时更新只有一个线程能成功。

    • isEmpty和size都是基于估算
    • 由于get操作不是阻塞的, 所以只能看到写入完成的值,新增和更新有点不同, 只有新增完成的entry才能看到, 引用下面一段jdk-1.8的注释来表达这个意思; 但是更新是满足volatile HB原则的.

    retrievals reflect the results of the most recently completed update operations holding upon their onset.

    • iterator

    iterator is designed to be used only one thread at a time.多个线程共用一个iterator实例, 可能会抛出 java.lang.IllegalStateException. 所以需要在每个线程用个单独的iterator实例

      * 一个线程创建iterator之后对另一个线程对map元素的增删改,是可见的.
    

Lock

  • ReentrantLock, Condition, ReadWriteLock, CountDownLatch, CyclicBarrier, Semaphore 基本都是基于AQS实现的, 可以说都是AQS的子类.

  • AbstractQueuedSynchronizer(AQS)
    当多个线程对一个锁的发起竞争,用一个标志位state表示锁是否被占用,要么获取到该锁,否则线程进入队列排队,阻塞。

    • 数据结构
      队列是一个双向链表,节点持有指向前后节点的引用,该节点的线程,该节点类型(是独占还是共享),头结点的线程表示持有锁的线程,后续排队的线程在等待获取锁。

    • 作为父类只关心资源的访问形式(互斥还是共享),资源访问不了了线程如何等待(阻塞),线程等待不了了如何返回,并不关心资源什么时候被释放,这由子类去定义

    • ConditionObject (条件队列)
      用于实现blockQueue, 读线程和写线程分别进入一个条件队列,若元素为空,则读阻塞,直到写入成功, 通知读队列的第一个线程, 写阻塞同理; 用于替换生产者和消费者问题中的 wait, notify, notifyall, Condition的await等同于wait, signal等同于notify.

  • ReentrantLock

    • 资源释放标志
      state初始化为0,判断 state==0,且只能由一个线程获取该资源
    • FairSync
      判断资源是否被占用,若被占用进入队列等待(阻塞),每次cpu释放,只获取队列头节点的后继节点的线程,其他线程仍然等待;
      阻塞线程可以用 LockSupport park()方法。
    • NonFairSync
      多个线程直接采用 CAS的方式抢占资源,抢占不到则与 FairSync处理逻辑一样。
      问题
    • 阻塞的线程如何取消等待? 通过中断
  • CountDownLatch
    设定一个计数器,使所有线程都等待(头节点自旋,其余节点阻塞),直到计数器变成0,
    因为是共享模式的锁,所以头结点得到释放后,会依次释放后续排队的线程,都可以获取到资源。

    • 资源释放标志

      state初始化为count,判断 state==0,则队列所有线程都可以获取该资源。

  • Semaphore(信号量)

    • 帮助记忆:厕所的钥匙,拿到钥匙(permit)才可以上厕所

    • 资源释放的标志

      permit的数量大于零

Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer.
可由任何人获取 permits ,任何人释放 permits,并不需要permits的持有者才能释放 permit;binary Semaphore(permit=1)这点与Mutex有别,Mutex只能由锁的获取者释放。可以根据资源的数量控制同时访问的线程数量,

  • CyclicBarrier(栅栏)

    • 帮助记忆:等待一定数量的线程在一定条件之后,一起开始运行某个任务。类似于运动员等待着发令枪一样。
    • 与CountDownLatch的区别

    CountDownLatch强调主线程(可能多个)等待任务线程完成任务后执行,任务线程执行完需要countDown(), countDown可以在多个线程中调用, 也可以在一个线程中调用多次, 且 CountDownLatch不能重用;CyclicBarrier 强调多个线程完成各自任务后,一起执行某个后续的任务,例如MapReduce. await必须在各个线程中调用, 若某个线程在await中超时, 中断或者结束, 其他在等待await的线程会收到BrokenBarrierException , 且可以通过调用reset恢复初始状态, 重用.

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. The barrier is called cyclic because it can be re-used after the waiting threads are released.

非阻塞算法

一个线程的失败和挂起不会引起其他些线程的失败和挂起,这样的算法称为非阻塞算法。非阻塞算法通过使用底层机器级别的原子指令来取代锁,从而保证数据在并发访问下的一致性。在更细粒度(机器指令)的级别上保证操作是原子的, 并发性提高, 且大大减少线程切换的开销. 同时,由于几乎所有的同步原语都只能对单个变量进行操作,这个限制导致非阻塞算法的设计和实现非常复杂。

  • 例如CAS ,原子变量类

  • 重量级锁的劣势

    JVM可以对非竞争锁的获取和释放进行优化,但是多个线程同时请求锁,JVM就需要求助操作系统,导致一些线程被挂起再恢复;挂起和恢复的过程会带来很大开销,而且当占有锁的线程被休眠时会导致其他线程也暂停所以当频繁发生锁的竞争时,会带来性能的降低。

  • 轻量级锁CAS

    • 优点
      基于操作系统层级的调用,不经过JVM ? 更快,非阻塞算法,不会引起线程的挂起和恢复,只保证操作是原子的,但是要保障数据的一致性(内存的可见性),仍然需要对数据加上volatile。
    • 缺点
      线程可能需要处理竞争的结果;会出现ABA问题,可以用AtomicStampedReference,记录每次更新的版本号。
  • AtomicReference
    可以判断一个实例是否被更新过,比较当前对象是否和之前的对象相等(==),等于再更新

  • 原子类和加锁(ReentrantLock)的性能比较
    在较低的竞争下,原子类(不会有线程切换的开销)的吞吐量更大; 高竞争下,加锁的吞吐量更大;可以类比于堵车时信号灯更好,拥堵较低时环岛吞吐量更好。

问题

  • 线程转换的状态,waiting和block的区别
    • waiting 指等待另一个线程的某个动作, 例如等待notify, 等待其他线程结束.
    • block 指等待获取monitor lock.都不会消耗cpu时间片

java memory model

  • 在多处理器的体系下, 处理器会牺牲一定程度的存储一致性(主存和处理器缓存的数据一致),来换取性能的提升,导致代码执行的顺序跟预想的不一样.
  • happens-before
    • 定义

      想要保证动作A的结果被B所看到, 则A,B必须满足happens-before原则

    • 原则

      见 concurrence in practice 16.1.1 P225

  • final
    final 修饰的field(例如final 数组, final map), 可以保证在多线程环境下被正确初始化之后才被调用,防止指令的重排序,注意若用final 修饰的object , 只有final修饰的域才能保证初始化是线程安全的.

publilsh safely in concurrence

https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
*


// Works with acquire/release semantics for volatile in Java 1.5 and later
// Broken under Java 1.4 and earlier semantics for volatile
class Foo {
    private volatile Helper helper;
    public Helper getHelper() {
        Helper localRef = helper;
        if (localRef == null) {
            synchronized(this) {
                localRef = helper;
                if (localRef == null) {
                    helper = localRef = new Helper();
                }
            }
        }
        return localRef;
    }
    
/*
Note the local variable "localRef", which seems unnecessary. The effect of this is that in cases where helper is
//already initialized (i.e., most of the time), the volatile field is only accessed once (due to "return localRef;"
instead of "return helper;"), which can improve the method's overall performance by as much as 25 percent.
*/

}


// another lazy init, more simple
// Correct lazy initialization in Java
class Foo {
    private static class HelperHolder {
       public static final Helper helper = new Helper();
    }

    public static Helper getHelper() {
        return HelperHolder.helper;
    }
}

工作上出现的并发bug

  • 并发下,全局变量的导致的线程不安全问题, 通过改为局部变量, 在每个线程的栈区, 则解决问题
  • 线程池使用优先级队列, 出现futureTask cant cast to comparable ex.

hadoop

数据的存储和分析

  • 关系型数据库和MR的区别?
    • 因为硬件的瓶颈,数据读取和写入时间在大数据时代很慢,所以需要分布式并行处理
    • MR适合批处理, 一次写入多次读取; 关系型数据库适合交互式, 多次读和写
    • 数据本地化,由于带宽很贵, 计算节点就近处理数据.

HDFS

参考自 hadoop官网-HdfsDesign

Assumptions and Goals

  1. 集群容错, 针对硬件等的失败, 能够监测并自动恢复, 保持数据可用.
  2. 流式数据处理, 更适用于批量的数据处理, 而不是交互式的数据读取, 例如mysql
  3. 用于存储海量数据集, 具备良好扩展性
  4. 适用于一次写入多次读取, 支持append和trancate, 不支持任意点的对数据的更改.
  5. "移动计算比移动数据性价比更高", 可以把计算移动到它所需要的数据旁边, 移动到同一个节点计算.

NameNode and DataNodes

  • namenode

执行filesystem namespace的opening, closing, 和修改文件和目录, 并保存了block和datanode的映射

  • datanode

负责处理客户端的读写请求, 包括对block的创建, 删除和复制等操作.

Replica Placement: The First Baby Steps

如果默认的三份拷贝(replication factor is three), local rack(本地机架)有两份, 其他机架有一份, 机架失效相比于node失效的几率是非常小的, 所以将两份数据都放在同一个机架, 可以充分利用同一个机架的带宽大于不同机架间的带宽, 提高读写效率.

读数据的时候, 采用就近原则, 读取距离客户端最近的那份副本.

Robustness

The three common types of failures are NameNode failures, DataNode failures and network partitions

  • Data Disk Failure, Heartbeats and Re-Replication

dataNode会周期性的发送心跳给nn, 若心跳失效则不转发IO请求给dn, 并按照之前的副本数新增额外的副本.

  • Cluster Rebalancing

如果某个dn的存储空间低于某个阈值, 则会自动转移数据; 如果对某个文件的请求大量增加, 则会增加副本, 并把请求分到其他节点上, 截止到hadoop-2.6.5这个功能还没实现

  • Data Integrity

通过计算block的checksum, 来保证数据完整性.

stores these checksums in a separate hidden file in the same HDFS namespace.

  • Metadata Disk Failure

FsImage, EditLog的损坏会导致hdfs不可用, 但是nn会保持这两个东西是多份的, 但是副本间的同步更新会降低nn的tps

  • Snapshots

快照功能, 可以使hdfs可以rollback到历史版本

Data Organization

  • data block
  • Staging

客户端写文件的请求, 并不会直接把数据写到nn, 而是写到一个本地的临时目录, 直到数据量达到了一个block, client才会把数据转移到目标dn和block; 如果nn在文件关闭之前down, 则文件会丢失.

  • Replication Pipelining

像上一节所说, 文件开始写到dn的时候, 像管道一样, 先写第一个dn, 并且同时写第二个dn, 第二个接收到数据的时候, 会同时写第三个dn.

Space Reclamation

  • File Deletes and Undeletes

被删除的文件不会立马物理删除, 先保存在 /trash 目录, 直到一定的时间, nn delete it from HDFS namespace, 然后对应的block 被释放.

  • Decrease Replication Factor

The next Heartbeat transfers this information to the DataNode. The DataNode then removes the corresponding blocks and the corresponding free space appears in the cluster. Once again, there might be a time delay between the completion of the setReplication API call and the appearance of free space in the cluster(在setReplication() 完成和实际物理空间的释放间存在延迟)

HDFS High Availability Using the Quorum Journal Manager

  • Architecture

两台nn, 一台active, 一台standby, 通过独立的daemon JN去完成同步,一台nn写, 另一台读, 保证namespace是同步的; 而且所有的dn都会向两台nn发送block list和心跳

  • Hardware resources

奇数n 台JN machine, 可以容忍 (n-1)/2个错误; JN daemon 是非常轻量级的.

  • dfs.ha.fencing.methods

用于隔离active和standby nn, 当需要切换nn的时候, 需要进行三个方面的隔离:

  1. 共享存储fencing:确保只有一个NN可以写入edits。QJM中每一个JournalNode中均有一个epochnumber,匹配epochnumber的QJM才有权限更新JN。当NN由standby状态切换成active状态时,会重新生成一个epoch number,并更新JN中的epochnumber,以至于以前的ActiveNN中的QJM中的epoch number和JN的epochnumber不匹配,故而原ActiveNN上的QJM没法往JN中写入数据(后面会介绍源码),即形成了fencing
  1. 客户端fencing:确保只有一个NN可以响应客户端的请求。
  1. DataNode fencing:确保只有一个NN可以向DN下发命令,譬如删除块,复制块,等等。

HDFS Federation

多个namenode独立, 分别管理各自的namespace, 解决了namenode需要的内存过大的问题, 可以水平扩展.

Automatic Failover

  • Components

two new components to an HDFS deployment: a ZooKeeper quorum, and the ZKFailoverController process (abbreviated as ZKFC)
失败监测: nn持有一个对zk的持久会话, nn失效, zk会知道
nn的选举: 提供对active nn的选举.

  • ZKFC的职责
  1. 是一个zk client, 负责监控和管理nn, 和nn在一台机器.
  1. Health monitoring, zkfc会ping nn, 了解nn的健康状况
  1. ZooKeeper session management, zkfc 持有一个对nn的session,
  2. 基于zk的选举,

HDFS Snapshots

用于数据备份, 数据保护, 和容灾恢复.

  1. 创建snapshot并没有copy block, 只是记录的block list和block size.
  2. 如果在snapshot之后有修改, 则修改按时间倒序, 获取snapshot的时候把修改减去即可.

HDFS and permission

hdfs block 的作用, 为何设置的如此之大

  • 使寻址时间远小于传输时间
  • 对大文件抽象处理

In the Apache Hadoop the default block size is 64 MB and in the Cloudera Hadoop the default is 128 MB. If block size was set to less than 64, there would be a huge number of blocks throughout the cluster, which causes NameNode to manage an enormous amount of metadata.

hdfs 数据一致性模型

  • 当前写入的block对其他reader不可见, 除非调用

hflush() This API flushes all outstanding data (i.e. the current unfinished packet) from the client into the OS buffers on all DataNode replicas(只需等待dfs.replication.min的复本数据写入完成(默认为1)).
保证被其他reader可见
hsync() This API flushes the data to the DataNodes, like hflush(), but should also force the data to underlying physical storage via fsync (or equivalent).
调用close(),相当于hsync,并关闭了stream.

hadoop-2.0之前有单点失败问题,namenode down恢复时间很久,现在有active nm1和standby nm2两台机器,both nodes communicate with a group of separate daemons called "JournalNodes",nm1有修改时 nm1写入log 并通过JN持久化,nm2读取日志做出相同的改变(日志的读写细节)

hadoop 日志

  • 可以把错误打到system.error来debug
  • 见中文版 P229

hdfs 命令

  • fsck

查看文件系统信息, hadoop fsck path -files -blocks -locations 可以查看文件对应的块信息

hadoop fsck -blockId blk_xxx 不需要后缀, 查看block属于的文件和副本情况.

疑问

  1. what is hdfs safemode?

On startup, the NameNode enters a special state called Safemode. Replication of data blocks does not occur when the NameNode is in the Safemode state. The NameNode receives Heartbeat and Blockreport messages from the DataNodes. A Blockreport contains the list of data blocks that a DataNode is hosting. Each block has a specified minimum number of replicas. A block is considered safely replicated when the minimum number of replicas of that data block has checked in with the NameNode. After a configurable percentage of safely replicated data blocks checks in with the NameNode (plus an additional 30 seconds), the NameNode exits the Safemode state. It then determines the list of data blocks (if any) that still have fewer than the specified number of replicas. The NameNode then replicates these blocks to other DataNodes.

  1. FsImage, EditLog的具体结构和原理
  2. QJM的实现原理, 和paxos算法.
  3. 详细了解zkfc的原理
  4. 如何上传大文件到hdfs?
  • flume, spark local file(解析本地文件的问题下次再看了), mapreduce
  • 可以把大文件分成多个小文件.

YARN(yer another resource negotiator)

  • AM(application master)

  • RM(resource manager)

  • containers
    每一个任务对应一个container,且只能在该container运行

  • NM(node manager)
    管理每个节点的资源和任务

  • scheduler

  1. Capacity Scheduler

保证每个organization有最少的资源分配

  1. Fair Scheduler

单个application会占满所有资源,当有更多的application会逐步分配资源,达到fair

MapReduce

  • 大致过程
    • map

      输入被分片到多个map, 每个map的输出先写到内存, 缓冲区满后会写入磁盘, 写磁盘之前会根据reducer划分为多个partition(partition数量和reducer task的数量一致, 可实现自定义的Partitioner) 每个partition中会根据key进行排序,若有combiner,则随后运行(运行条件: 溢出文件达到 min.num.spills.for.combine ), 把一个map(注意是一个map)中的输出进行reduceByKey, 能减少数据量, 然后再写到磁盘中.

    • reduce

      先把map的输出复制, 一旦有一个map的输出完成, 就进行复制, 复制完成后, 进行merge, 将多个map输出合并

  • How Many Maps?

由block的数量决定

  • how many reducers ?

The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

map的输入

默认的基类是FileInputFormat, 根据block size来split, 剩余的大小不足一个block size也会占据一个block, 且一个文件占据一个block; 一个split对应一个map的输入;

  • 分片大小的计算

splitSize = max(minSize, min(maxSize, blockSize))

  • small files problem

    • 问题
      • 每个文件都是一个对象, nm需要存储文件的元数据在内存中, 会占用大量的nm内存, 降低实际的存储能力.
      • 用作计算的话, 会降低性能, 花费大量的寻址时间和任务的启动和资源的释放时间.
    • 解决办法
      若输入是大量的小文件, 可以切换成CombineFileInputFormat(需要自己实现? ), SequenceFile(将多个小文件合并成一个seqFile), hadoop har 将多个小文件作为一个map的输入,
  • 多个文件一个分片(一个map处理速度非常快)
    提高分片大小

  • 超过分片大小的文件, 不想拆分
    提高分片大小; 重写isSplitAble(), 返回false

  • 一个文件作为一条record, 例如将多个小文件合并成一个顺序文件
    以上问题用spark如何处理

yarn

架构
yarn_architecture
主要参考 http://hadoop.apache.org/docs/r2.6.5/hadoop-yarn/hadoop-yarn-site/YARN.html

  • resourceManager

分为 scheduler和 ApplicationsManager 两个组件
scheduler 只负责资源的收集, 分为 CapacityScheduler and the FairScheduler, 不负责application的监控等
ApplicationsManager 负责接收 application的提交请求, 并分配AM, 并且负责监控AM的状态, 失败重启等.

  • CapacityScheduler

可继承的队列
Capacity Guarantees: 每个队列有资源的硬限制和软限制.
Security: 每个队列有独立的ACL
弹性(Elasticity) : 队列可以获取到超过其容量的资源, 如果集群有空闲资源的话

多租户(Multi-tenancy): 保证每个application, user, queue都不能独占整个集群的资源.
Runtime Configuration : 可支持运行时配置.
Drain applications : 可以控制queue的启动和停止, 停止的时候不能接受新的application.

  • nodeManager

每个节点一个, 负责监控container 的状态, 监控资源的使用情况并上报给RM

  • ApplicationMaster

每个application一个, 负责与RM协商并获取 resource container的情况, 并监控container.

问题

  • MR和spark shuffle的过程, 以及调优
    • spark shuffle见 spark笔记
    • MR shuffle
  • hive 分区分桶
  • hive ACID的支持
  • hive和 mysql的区别

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.