Coder Social home page Coder Social logo

javaweb's Introduction

javaweb's People

Contributors

www1350 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

javaweb's Issues

如何在ajax权限判断后跳转?

经常会遇到一种场景,直接访问某些权限被拒绝后跳转登陆页面,然而ajax不会跳转
这个时候使用全局的:

  $(function(){
//全局的ajax访问,处理ajax清求时sesion超时
$.ajaxSetup({ 
    complete:function(XMLHttpRequest,textStatus){ 
    var sessionstatus=XMLHttpRequest.getResponseHeader("sessionstatus"); //通过XMLHttpRequest取得响应头,sessionstatus,
    if(sessionstatus=="timeout"){ 
        //如果超时就处理 ,指定要跳转的页面
        window.location.replace(urlconfig.url.ctx+"/login.jsp"); 
    } 
} 
})

})

在拦截器里面:

        if (httpRequest.getHeader("x-requested-with") != null
            && httpRequest.getHeader("x-requested-with").equalsIgnoreCase("XMLHttpRequest"))// 如果是ajax请求响应头会有,x-requested-with;
        {
            httpResponse.setHeader("sessionstatus", "timeout");// 在响应头设置session状态
            httpResponse.setStatus(403);
            return false;
        } else {
            httpResponse.sendRedirect(httpResponse.encodeRedirectURL("/login.jsp"));
        }

scala 学习(二)

val var

在scala里,变量被分为val、var两种类型:val指的实际是常量,var指的是变量。实际上大多数变量都是不可变的,鼓励使用val。
通常将类型放在变量后面:

val greeting : String = "hello"
还可以做多个声明
val xmax,xmin = 100

lazy 被声明为lazy的时候只有在首次取值才会初始化

lazy val st = scala.io.Source.fromFile("file").mkStringg//首次使用时候取值
val st = scala.io.Source.fromFile("file").mkStringg//定义时取
def st = scala.io.Source.fromFile("file").mkStringg//每一次使用取

算术和操作符重载

这是scala的一大便捷的地方,重载了大量的操作符。结果是你可以使用 **a.方法(b)**,也可以使用**a 方法 b**。如`1.to(10)` ` 1 to 10`
现在你甚至可以这样操作:
val x :BigInt = 137788959
x * x * x

apply
"hello"(4) 实际上是 “hello”.apply(4) 的重载

条件表达式

val s = if(x>0) 1 else “error”
好好处显而易见,声明为val常量。而且因为结果都是Any的子类,这样也是可以的,甚至可以
val s = if(x>0) 1 else ()
因为()表示Unit类型(可以理解为void)

特例
y=x=1
这里的x=1会被赋值给y,而x=1是Unit类型,类似的有{x=-1;y=2}这个块,也是Unit类型

循环

for ( i <- 表达式 )
更美好的
for ( i <- 1 to 9 ; j <- 1 to 2 if i != j) printf( i * j) 打印 2 2 3 .... 9 18
for ( i <- 1 to 9 ; j <- 1 to 2 if i != j) yield i * j 生成一个集合Vector
a.filter { _ % 2 == 0 } map { 2 * _ }

函数

下面是一个函数,(默认最后一行是返回值)

def abs (num : Int) : Int = {
    if( num< 0) -num
}

默认参数和带名参数
def dec( str : String , left :String = "[",right = "]") = left + str + right
调用的时候可以dec("hello") 、dec("hello") 、dec("hello","<<<<[”) 、dec(left="<<",str="hello",right=">>>") 、dec("hello",right="]>>>") 、
变长参数

def sum( args:Int* ) ={
var re = 0
for(e<- args) re+=e
re
}

调用:sum(1,3,5)  **sum(1 to 5 : _*)**

异常

try{
...
}catch{
case _: MalformedURLException => printf("bad:" + url )
case ex: IOException => ex.printStackTrace()

}finally{
...
}

定长、不定长数组

//对应JVM 的int []
val num = new Array[Int](10)
val s = Array("aa","vv")


//对应ArrayList
val b = ArrayBuffer[Int]()
b+=1
b+=(2,3,4,5)
b++=Array(6,7,8)
b.trimEnd(3)//移除最后3个
b.insert(2,0)//12,0,3,4,5
b.insert(2,0,9,8)//12,0,9,8,0,3,4,5
b.remove(2)//12,9,8,0,3,4,5
b.remove(2,3)//12,3,4,5


for ( i <- 0 until a.length )

//快排
scala.util.Sorting.quickSort(a)

//多维
Array.ofDim[Double](2,3)//2行3列


#8

MongoDB写日志

public class MongoFactory {
    private static  Mongo mongo;
    private static final String DB_NAME = ConfigUtil.getConfig().getDbname();
    private static final String BILLINGLOG_NAME="billingLog";
    private static ArrayList<ServerAddress> servers = ConfigUtil.getConfig().getServers();
    private static final int poolSize = ConfigUtil.getConfig().getCorePoolSize();
    private static final int threadsAllowedToBlockForConnectionMultiplie = ConfigUtil.getConfig().getThreadsAllowedToBlockForConnectionMultiplie();
    private static String username =ConfigUtil.getConfig().getUsername();
    private static String password = ConfigUtil.getConfig().getPassword();
    private MongoFactory(){
        init();
    }
    public static MongoFactory getInstance(){
        return InnerHolder.INSTANCE;
    }
    private static class InnerHolder{
        static final  MongoFactory INSTANCE = new MongoFactory();
    }

    public DBCollection getConnection(String dbName){
        if(mongo==null){
            init() ;
        }
        DB db = mongo.getDB(DB_NAME);
        if(db.authenticate(username,password.toCharArray())){
            return mongo.getDB(DB_NAME).getCollection(dbName);
        }else {
           throw new MongoException("认证失败");
        }


    }
    private void init(){
        try{
            mongo = new Mongo(servers);
            MongoOptions options = mongo.getMongoOptions();
            options.autoConnectRetry = true;
            options.connectionsPerHost= poolSize;
            options.threadsAllowedToBlockForConnectionMultiplier = threadsAllowedToBlockForConnectionMultiplie;
            options.writeConcern= WriteConcern.SAFE;
        }catch (Exception exception){
            exception.printStackTrace();

        }

    }
}

使用新版api

public class MongoFactory {
         private static MongoClient mongo;
         private static MongoDatabase db;
        private static final String DB_NAME = ConfigUtil.getConfig().getDbname();
        private static final String BILLINGLOG_NAME="billingLog";
        private static ArrayList<ServerAddress> servers = ConfigUtil.getConfig().getServers();
        private static final int poolSize = ConfigUtil.getConfig().getCorePoolSize();
        private static final int threadsAllowedToBlockForConnectionMultiplie = ConfigUtil.getConfig().getThreadsAllowedToBlockForConnectionMultiplie();
        private static String username =ConfigUtil.getConfig().getUsername();
        private static String password = ConfigUtil.getConfig().getPassword();
        private MongoFactory(){
            init();
        }
        public static MongoFactory getInstance(){
            return InnerHolder.INSTANCE;
        }
        private static class InnerHolder{
            static final  MongoFactory INSTANCE = new MongoFactory();
        }

        public MongoDatabase getConnection(String dbName){
            if(mongo==null){
                init() ;
            }
          db = mongo.getDatabase(dbName);
        return   db;
        }

        public MongoCollection<Document> getCollection(String collectName){
            getConnection(DB_NAME);
        return   db.getCollection(collectName);
        }

        private void init(){
            try{
                 MongoCredential credentials = MongoCredential.createScramSha1Credential(username, password, password.toCharArray());
                List<MongoCredential> credentialsList = new ArrayList<MongoCredential>();
                credentialsList.add(credentials);

                 MongoClientOptions options =   new MongoClientOptions.Builder().socketKeepAlive(true) // 是否保持长链接
                .connectTimeout(5000) // 链接超时时间
                .socketTimeout(5000) // read数据超时时间
                .readPreference(ReadPreference.primary()) // 最近优先策略
//              .autoConnectRetry(true) // 是否重试机制
                .connectionsPerHost(poolSize) // 每个地址最大请求数
                .maxWaitTime(1000 * 60 * 2) // 长链接的最大等待时间
                .threadsAllowedToBlockForConnectionMultiplier(threadsAllowedToBlockForConnectionMultiplie) // 一个socket最大的等待请求数
                .writeConcern(WriteConcern.SAFE).build();
//               mongo = new MongoClient(servers,options);
                 mongo = new MongoClient(servers,credentialsList,options);
            }catch (Exception exception){
                exception.printStackTrace();

            }

        }
}

HttpURLConnection

设置连接参数的方法
setAllowUserInteraction
setDoInput
setDoOutput
setIfModifiedSince
setUseCaches
setDefaultAllowUserInteraction
setDefaultUseCaches
设置请求头或响应头
HTTP请求允许一个key带多个用逗号分开的values,但是HttpURLConnection只提供了单个操作的方法:

setRequestProperty(key,value)
addRequestProperty(key,value)
setRequestProperty和addRequestProperty的区别就是,setRequestProperty会覆盖已经存在的key的所有values,有清零重新赋值的作用。而addRequestProperty则是在原来key的基础上继续添加其他value。

发送URL请求
建立实际连接之后,就是发送请求,把请求参数传到服务器,这就需要使用outputStream把请求参数传给服务器:

getOutputStream
获取响应
请求发送成功之后,即可获取响应的状态码,如果成功既可以读取响应中的数据,获取这些数据的方法包括:

getContent
getHeaderField
getInputStream
对于大部分请求来说,getInputStream和getContent是用的最多的。

相应的信息头用以下方法获取:

getContentEncoding
getContentLength
getContentType
getDate
getExpiration
getLastModifed

        URL url = new URL(path);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();

        // 设置http协议post方式提交请求的必须属性
        conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
        conn.setRequestProperty("Content-Length", String.valueOf(entityData.length));

        conn.setConnectTimeout(5 * 1000);

           // 设置是否向httpUrlConnection输出,因为这个是post请求,参数要放在
            // http正文内,因此需要设为true, 默认情况下是false;
            httpUrlConn.setDoOutput(true);
            // 设置是否从httpUrlConnection读入,默认情况下是true
            httpUrlConn.setDoInput(true);
            // Post 请求不能使用缓存
            httpUrlConn.setUseCaches(false);


        OutputStream outputStream = conn.getOutputStream();
        outputStream.write(entityData);
        outputStream.close();

setConnectTimeout:设置连接主机超时(单位:毫秒)
setReadTimeout:设置从主机读取数据超时(单位:毫秒)

netty入门到精通

netty3.x

入门

public void run() {
    // Configure the server.
    ServerBootstrap bootstrap = new ServerBootstrap(
            new NioServerSocketChannelFactory(
                    Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool()));

    // Set up the pipeline factory.
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() throws Exception {
            return Channels.pipeline(new EchoServerHandler());
        }
    });
         bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
....
    // Bind and start to accept incoming connections.
    bootstrap.bind(new InetSocketAddress(port));
}

业务代码

public class EchoServerHandler extends SimpleChannelUpstreamHandler {

    @Override
    public void messageReceived(
            ChannelHandlerContext ctx, MessageEvent e) {
        // Send back the received message to the remote peer.
        e.getChannel().write(e.getMessage());
    }
}

精通
手册:http://netty.io/3.7/guide/

netty4.x

入门
服务端

     /**
     * 服务端监听的端口地址
     */
    private static final int portNumber = 7878;

  EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup);
        b.channel(NioServerSocketChannel.class);
        b.childHandler(new HelloServerInitializer());

        // 服务器绑定端口监听
        ChannelFuture f = b.bind(portNumber).sync();

        System.out.println("init server");

        // 监听服务器关闭监听
        f.channel().closeFuture().sync();

        // 可以简写为
        /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        // 以("\n")为结尾分割的 解码器
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

        // 字符串解码 和 编码
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 自己的逻辑Handler
        pipeline.addLast("handler", new HelloServerHandler());
    }
}
public class HelloServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 收到消息直接打印输出
        System.out.println(ctx.channel().remoteAddress() + " Say : " + msg);

        // 返回客户端消息 - 我已经接收到了你的消息
        ctx.writeAndFlush("我Received your message "+msg+"!\n");
    }

    /*
     * 
     * 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候)
     * 
     * channelActive 和 channelInActive 在后面的内容中讲述,这里先不做详细的描述
     * */
/*    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");

        ctx.writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");

        super.channelActive(ctx);
    }*/
}

客户端

    public static String host = "127.0.0.1";
    public static int port = 7878;

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .handler(new HelloClientInitializer());

            // 连接服务端
            Channel ch = b.connect(host, port).sync().channel();
            System.out.println("init client");
            // 控制台输入
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                String line = in.readLine();
                if (line == null) {
                    continue;
                }
                /*
                 * 向服务端发送在控制台输入的文本 并用"\r\n"结尾
                 * 之所以用\r\n结尾 是因为我们在handler中添加了 DelimiterBasedFrameDecoder 帧解码。
                 * 这个解码器是一个根据\n符号位分隔符的解码器。所以每条消息的最后必须加上\n否则无法识别和解码
                 * */
                ch.writeAndFlush(line + "\r\n");
            }
        } finally {
            // The connection is closed automatically on shutdown.
            group.shutdownGracefully();
        }
public class HelloClientHandler extends SimpleChannelInboundHandler<String>{

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 收到消息直接打印输出
        System.out.println(ctx.channel().remoteAddress() + " Say : " + msg);

    }
}
public class HelloClientInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
          ChannelPipeline pipeline = ch.pipeline();

            // 以("\n")为结尾分割的 解码器
            pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

            // 字符串解码 和 编码
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());

            // 自己的逻辑Handler
            pipeline.addLast("handler", new HelloClientHandler());
    }

}
  • 一个EventLoopGroup包含一个或多个EventLoop
  • 一个EventLoop在它生命周期只和一个Thread绑定
  • 所有EventLoop处理的I/O事件都将在专有的Thread上处理
  • 一个channel在生命周期只注册于一个EventLoop
  • 一个EventLoop可能会分配给一个或多个channel

Channel

  • 线程安全
  • write:数据写到远程结点。数据传给channelpipeline排队直到被冲刷
  • flush:已写数据冲刷到传输底层socket
  • 内置的传输:
  1. NIO/io.netty.channel.socket.nio/基于选择器
  2. Epoll/io.netty.channel.epoll/JNI驱动的epoll和非阻塞IO
  3. OIO/io.netty.channel.socket.oio/java.net基础/阻塞流
  4. Local/io.netty.channel.local/VM内部通过管道进行通信/本地传输
  5. Embedded/io.netty.channel.embedded/Embedded,ChannelHandler不需经过网络

Channel生命周期

  • ChannelUnregistered channel已创建,但还没注册到EventLoop
  • ChannelRegistered 注册到EventLoop
  • ChannelActive channel处于活动状态(已连接到远程结点),可以接收和发送数据
  • ChannelInactive 没有连接到远程结点
    image

ChannelHandler生命周期

  • handlerAdded 当ChannelHandler被添加到一个ChannelPipeline时被调用
  • handlerRemoved 当ChannelHandler从一个ChannelPipeline中移除时被调用
  • exceptionCaught 处理过程中ChannelPipeline中发生错误时被调用

ChannelInboundHandler——处理输入数据和所有类型的状态变化

方法:
image

类型 描述
channelRegistered 当一个Channel注册到EventLoop上,可以处理I/O时被调用
channelUnregistered 当一个Channel从它的EventLoop上解除注册,不再处理I/O时被调用
channelActive 当Channel变成活跃状态时被调用;Channel是连接/绑定、就绪的
channelInactive 当Channel离开活跃状态,不再连接到某个远端时被调用
channelReadComplete 当Channel上的某个读操作完成时被调用
channelRead 当从Channel中读数据时被调用
channelWritabilityChanged 当Channel的可写状态改变时被调用。通过这个方法,用户可以确保写操作不会进行地太快(避免OutOfMemoryError)或者当Channel又变成可写时继续写操作。Channel类的isWritable()方法可以用来检查Channel的可写状态。可写性的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()来设定。
userEventTriggered 因某个POJO穿过ChannelPipeline引发ChannelnboundHandler.fireUserEventTriggered()时被调用

当一个ChannelInboundHandler实现类重写channelRead()方法时,它要负责释放ByteBuf相关的内存

public class DiscardHandler extends ChannelInboundHandlerAdapter {  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        //手动释放消息  
        ReferenceCountUtil.release(msg);  
    }  
}  

一个更简单的替代方法就是用SimpleChannelInboundHandler

public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
        //不需要手动释放  
    }  
}  
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
...
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

 protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}
...

ChannelOutboundHandler——处理输出数据,可以拦截所有操作

类型 描述
bind(ChannelHandlerContext,SocketAddress,ChannelPromise) 请求绑定Channel到一个本地地址
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) 请求连接Channel到远端
disconnect(ChannelHandlerContext, ChannelPromise) 请求从远端断开Channel
close(ChannelHandlerContext,ChannelPromise) 请求关闭Channel
deregister(ChannelHandlerContext, ChannelPromise) 请求Channel从它的EventLoop上解除注册
read(ChannelHandlerContext) 请求从Channel中读更多的数据
flush(ChannelHandlerContext) 请求通过Channel刷队列数据到远端
write(ChannelHandlerContext,Object, ChannelPromise) 请求通过Channel写数据到远端

CHANNELPROMISE VS. CHANNELFUTURE
ChannelOutboundHandler的大部分方法都用了一个ChannelPromise输入参数,用于当操作完成时收到通知。ChannelPromise是ChannelFuture的子接口,定义了可写的方法,比如setSuccess(),或者setFailure(),而ChannelFuture则是不可变对象。

ChannelHandler适配器类

image

资源管理

无论何时你对数据操作ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write(),你需要确保没有资源泄露。也许你还记得上一章我们提到过,Netty采用引用计数来处理ByteBuf池。所以,在你用完一个ByteBuf后,调整引用计数的值是很重要的。

为了帮助你诊断潜在的问题, Netty提供了ResourceLeakDetector类,它通过采样应用程序1%的buffer分配来检查是否有内存泄露。这个过程的开销是很小的。

如果泄露被检测到,会产生类似下面这样的日志消息:

LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option
'-Dio.netty.leakDetectionLevel=ADVANCED' or call
ResourceLeakDetector.setLevel().

级别 描述
DISABLED 关闭内存泄露检测。 只有在大量测试后,才能用这个级别
SIMPLE 报告默认的1%采样率中发现的任何泄露。这是默认的级别,在大部分情况下适用
ADVANCED 报告发现的泄露和消息的位置。使用默认的采样率。
PARANOID 类似ADVANCED级别,但是每个消息的获取都被检测采样。这对性能有很大影响,只能在调试阶段使用。

用上表中的某个值来配置下面这个Java系统属性,就可以设定内存泄露检测级别:

java -Dio.netty.leakDetectionLevel=ADVANCED

如果你设定这个JVM选项然后重启你的应用,你会看到应用中泄露buffer的最新位置。下面是一个单元测试产生的典型的内存泄露报告:

Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)

在你实现ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()时,你怎样用这个诊断工具来防止内存泄露呢?让我们来看下ChannelRead()操作“消费(consume)”输入数据这个情况:就是说,当前handler没有通过ChannelContext.fireChannelRead()把消息传递到下一个ChannelInboundHandler。下面的代码说明了如何释放这条消息占用的内存。

public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {  
    @Override  
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        ReferenceCountUtil.release(msg);  
    }  
}  

public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {  
    @Override  
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
        ReferenceCountUtil.release(msg);  
        promise.setSuccess();  
    }  
}  

重要的是,不仅要释放资源,而且要通知ChannelPromise,否则会出现某个ChannelFutureListener没有被通知到消息已经被处理的情况。

总之,如果一个消息被“消费”或者丢弃,没有送到ChannelPipeline中的下一个ChannelOutboundHandler,用户就要负责调用ReferenceCountUtil.release()。如果消息到达了真正的传输层,在它被写到socket中或者Channel关闭时,会被自动释放(这种情况下用户就不用管了)。

ChannelPipeline接口

https://segmentfault.com/a/1190000007308934
如果你把一个ChannelPipeline看成是一串ChannelHandler实例,拦截穿过Channel的输入输出event,那么就很容易明白这些ChannelHandler的交互是如何构成了一个应用程序数据和事件处理逻辑的核心。

每个新创建的Channel都会分配一个新的ChannelPipeline。这个关系是恒定的;Channel不可以换别ChannelPipeline,也不可以解除掉当前分配的ChannelPipeline。在Netty组件的整个生命周期中这个关系是固定的,不需要开发者采取什么操作。

根据来源,一个event可以被一个ChannelInboundHandler或者ChannelOutboundHandler处理。接下来,通过调用ChannelHandlerContext的方法,它会被转发到下一个同类型的handler。

image

方法名 描述
fireChannelRegistered 调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)
fireChannelUnregistered 调用ChannelPipeline中下一个ChannelInboundHandler的channelUnRegistered(ChannelHandlerContext)
fireChannelActive 调用ChannelPipeline中下一个ChannelInboundHandler的channelActive(ChannelHandlerContext)
fireChannelInactive 调用ChannelPipeline中下一个ChannelInboundHandler的channelInactive(ChannelHandlerContext)
fireExceptionCaught 调用ChannelPipeline中下一个ChanneHandler的exceptionCaught(ChannelHandlerContext,Throwable)
fireUserEventTriggered 调用ChannelPipeline中下一个ChannelInboundHandler的userEventTriggered(ChannelHandlerContext, Object)
fireChannelRead 调用ChannelPipeline中下一个ChannelInboundHandler的channelRead(ChannelHandlerContext, Object msg)
fireChannelReadComplete 调用ChannelPipeline中下一个ChannelStateHandler的channelReadComplete(ChannelHandlerContext)
方法名 描述
bind 绑定Channel到一个本地地址。这会调用ChannelPipeline中下一个ChannelOutboundHandler的bind(ChannelHandlerContext, SocketAddress, ChannelPromise)
connect 连接Channel到一个远端地址。这会调用ChannelPipeline中下一个ChannelOutboundHandler的connect(ChannelHandlerContext, SocketAddress, ChannelPromise)
disconnect 断开Channel。这会调用ChannelPipeline中下一个ChannelOutboundHandler的disconnect(ChannelHandlerContext, ChannelPromise)
close 关闭Channel。这会调用ChannelPipeline中下一个ChannelOutboundHandler的close(ChannelHandlerContext,ChannelPromise)
deregister Channel从它之前分配的EventLoop上解除注册。这会调用ChannelPipeline中下一个ChannelOutboundHandler的deregister(ChannelHandlerContext, ChannelPromise)
flush 刷所有Channel待写的数据。这会调用ChannelPipeline中下一个ChannelOutboundHandler的flush(ChannelHandlerContext)
write 往Channel写一条消息。这会调用ChannelPipeline中下一个ChannelOutboundHandler的write(ChannelHandlerContext, Object msg, ChannelPromise)   注意:不会写消息到底层的Socket,只是排队等候。如果要写到Socket中,调用flush()或者writeAndFlush()
writeAndFlush 这是先后调用write()和flush()的便捷方法。
read 请求从Channel中读更多的数据。这会调用ChannelPipeline中下一个ChannelOutboundHandler的read(ChannelHandlerContext)

ChannelHandlerContext接口

ChannelHandlerContext代表了一个ChannelHandler和一个ChannelPipeline之间的关系,它在ChannelHandler被添加到ChannelPipeline时被创建。ChannelHandlerContext的主要功能是管理它对应的ChannelHandler和属于同一个ChannelPipeline的其他ChannelHandler之间的交互。

ChannelHandlerContext有很多方法,其中一些方法Channel和ChannelPipeline也有,但是有些区别。如果你在Channel或者ChannelPipeline实例上调用这些方法,它们的调用会穿过整个pipeline。而在ChannelHandlerContext上调用的同样的方法,仅仅从当前ChannelHandler开始,走到pipeline中下一个可以处理这个event的ChannelHandler。

方法名 描述
bind 绑定到给定的SocketAddress,返回一个ChannelFuture
channel 返回绑定的Channel
close 关闭Channel,返回一个ChannelFuture
connect 连接到给定的SocketAddress,返回一个ChannelFuture
deregister 从先前分配的EventExecutor上解除注册,返回一个ChannelFuture
disconnect 从远端断开,返回一个ChannelFuture
executor 返回分发event的EventExecutor
fireChannelActive 触发调用下一个ChannelInboundHandler的channelActive()(已连接)
fireChannelInactive 触发调用下一个ChannelInboundHandler的channelInactive()(断开连接)
fireChannelRead 触发调用下一个ChannelInboundHandler的channelRead()(收到消息)
fireChannelReadComplete 触发channelWritabilityChanged event到下一个ChannelInboundHandler
handler 返回绑定的ChannelHandler
isRemoved 如果绑定的ChannelHandler已从ChannelPipeline中删除,返回true
name 返回本ChannelHandlerContext 实例唯一的名字
Pipeline 返回绑定的ChannelPipeline
read 从Channel读数据到第一个输入buffer;如果成功,触发一条channelRead event,通知handler channelReadComplete
write 通过本ChannelHandlerContext写消息穿过pipeline
在使用ChannelHandlerContext API时,请牢记下面几点:
  • 一个ChannelHandler绑定的ChannelHandlerContext 永远不会改变,所以把它的引用缓存起来是安全的。
  • 像我们在这节刚开始解释过的,ChannelHandlerContext的一些方法和其他类(Channel和ChannelPipeline)的方法名字相似,但是ChannelHandlerContext的方法采用了更短的event传递路程。我们应该尽可能利用这一点来实现最好的性能。

异常出站

1.添加ChannelFutureListener就是为了在ChannelFuture实例上调用addListener(ChannelFutureListener)方法,有两种方法可以做到这个。最常用的方法是在输出操作(比如write())返回的ChannelFuture上调用addListener()。

ChannelFuture future = channel.write(...);
 future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (!f.isSuccess()) {
                        f.cause().printStackTrace();
                        r.channel().close();
                    } 
                }
            });

2.添加一个ChannelFutureListener到ChannelPromise,然后将这个ChannelPromise作为参数传入ChannelOutboundHandler方法。下面的代码和前一段代码有相同的效果。

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) {
                if (!f.isSuccess()) {
                    f.cause().printStackTrace();
                    f.channel().close();
                }
            }
        });
    }
}

ByteBuf

image

  1. reader index前面的数据是已经读过的数据,这些数据可以扔掉
  2. 从reader index开始,到writer index之前的数据是可读数据
  3. 从writer index开始,为可写区域
    正是因为这样的设计,ByteBuf可以同时读写数据(只要可读区域和可写区域都还有空闲空间),而java.nio.ByteBuffer则必须调用flip()方法才能从写状态切换到读状态。

ByteBufAllocator

ByteBufAllocator byteBufAllocator = channel.alloc();
        //        byteBufAllocator.compositeBuffer();
        //        byteBufAllocator.buffer();
ByteBuf byteBuf = byteBufAllocator.directBuffer();

image
image

UnpooledByteBufAllocator:池化了ByteBuf并最大限度减少内存碎片。使用jemalloc(https://www.cnblogs.com/gaoxing/p/4253833.html)
PooledByteBufAllocator:不池化,每次调用返回新实例

Unpooled

创建未池化ByteBuf

ByteBufUtil类

  • hexdump 十六进制形式打印ByteBuf内容
  • equals 判断两个ByteBuf相等

Netty系列之Netty高性能之道

Netty系列之Netty线程模型

ThreadPoolExecutor

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)

  • corePoolSize: 线程池维护线程的最少数量

  • maximumPoolSize:线程池维护线程的最大数量

  • keepAliveTime: 线程池维护线程所允许的空闲时间

  • unit: 线程池维护线程所允许的空闲时间的单位

  • workQueue: 线程池所使用的缓冲队列

  • handler: 线程池对拒绝任务的处理策略

    一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法欲添加到线程池时:

  1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
  2. 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
  3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
  5. 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

handler有四个选择:
ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常

ThreadPoolExecutor.CallerRunsPolicy()

当抛出RejectedExecutionException异常时,会调用rejectedExecution方法
(如果主线程没有关闭,则主线程调用run方法,源码如下

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
)
ThreadPoolExecutor.DiscardOldestPolicy()

抛弃旧的任务

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

下面是一个用线程池做日志记录的例子

     /**
     * 线程池
     */
    private static ThreadPoolExecutor producerPool;
    static {
        // 构造一个线程池
        producerPool = new ThreadPoolExecutor(ConfigUtil.getConfig().getCorePoolSize(), ConfigUtil.getConfig().getMaximumPoolSize(), ConfigUtil.getConfig().getKeepAliveTime(),
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    /**
     * 写入日志
     *
     * @param t
     * @param <T>
     */
    public <T extends BaseLog> void log(T t) {
            producerPool.execute(new ServiceLogTask(t));
    }

    // 异步线程执行写入日志操作
    class ServiceLogTask implements Runnable, Serializable {
        private BaseLog log;
        public ServiceLogTask(BaseLog log) {
            if (log != null) {
                this.log = log;
            } else {
                throw new IllegalArgumentException("参数不可为空");
            }
        }

        public void run() {
            try {
                insertLog();
            } catch (Exception e) {
                //将失败的数据保存到Log4J,便于之后再处理
                log.setDec(e.getMessage());
                logger.error(JSONParser.convertObjectToJson(log));
            }
        }

        public void insertLog() {
            String switchValue = haveServiceLogSwitch(this.log.getLogCategory());
            if (StringUtils.isEmpty(switchValue) || SWITCH_ON.equalsIgnoreCase(switchValue)) {
               //修改为客户端直接存入数据
               String jsonLogString = JSONParser.convertObjectToJson(this.log);
                DBObject dbObject = (DBObject)JSON.parse(jsonLogString);
                String type = dbObject.get(LOG_TYPE_FIELD) == null ? "" : dbObject.get(LOG_TYPE_FIELD).toString();
                LogType logType = LogType.valueOf(type);
                if(logType!=null){
                    dbObject.removeField(LOG_TYPE_FIELD) ;
                    MongoFactory.getInstance().getConnection(logType.getCollectionName()).save(dbObject);
                }

            }
        }
public class ConfigUtil {
    private static final ConfigUtil config = new ConfigUtil();

    /**
     * 线程池维护线程的最少数量
     */
    private int corePoolSize;

    /**
     * 线程池维护线程的最大数量
     */
    private int maximumPoolSize;

    /**
     * 线程池维护线程所允许的空闲时间(s)
     */
    private long keepAliveTime;

    /**
     * 数据库对应的名字
     */
    private String dbname ;

    /**
     * mongodb维护的每个主机的最大连接数,Mongo类中已经实现
     */
    private int connectionsPerHost;

    /**
     * mongo维护的可等待的线程倍数
     */
    private int threadsAllowedToBlockForConnectionMultiplie;


    /**
     * 添加支持集群的模式
     */
    private ArrayList<ServerAddress> servers;

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;
    private ConfigUtil() {
        //初始化配置读取
        Properties configMessage = new Properties();
        try {
            configMessage.load(ConfigUtil.class.getResourceAsStream("/conf/logConfig.properties"));
            servers=loadAddress(configMessage);
            corePoolSize = getInt(configMessage, "corePoolSize", 5);
            maximumPoolSize = getInt(configMessage, "maximumPoolSize", 100);
            keepAliveTime = getLong(configMessage, "keepAliveTime", 100);
            dbname  = getString(configMessage,"mongo.dbname");
            connectionsPerHost = getInt(configMessage,"mongo.connectionsPerHost",100);
            threadsAllowedToBlockForConnectionMultiplie = getInt(configMessage,"mongo.threadsAllowedToBlockForConnectionMultiplier",20);
            username=getString(configMessage,"mongo.username") ;
            password=getString(configMessage,"mongo.password");
        } catch (Exception e) {
            throw new RuntimeException("读取配置文件logConfig.properties错误", e);
        }
    }

    public static ConfigUtil getConfig() {
        return config;
    }

    public int getCorePoolSize() {
        return corePoolSize;
    }

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public long getKeepAliveTime() {
        return keepAliveTime;
    }

    public String getDbname() {
        return dbname;
    }
    private static long getLong(Properties config, String key, long defaultValue) {
        String value = config.getProperty(key);
        if (StringUtils.isNotEmpty(value)) {
            return Long.valueOf(value);
        }
        return defaultValue;
    }

    private static int getInt(Properties config, String key, int defaultValue) {
        String value = config.getProperty(key);
        if (StringUtils.isNotEmpty(value)) {
            return Integer.valueOf(value);
        }
        return defaultValue;
    }

    private static String getString(Properties config, String key) {
        String value = config.getProperty(key);
        if (StringUtils.isNotEmpty(value)) {
            return value;
        }
        throw new RuntimeException(key + "值不存在");
    }
    private static ArrayList<ServerAddress> loadAddress(Properties config){
        ArrayList<ServerAddress> servers=new ArrayList<ServerAddress>();
        String value = config.getProperty("mongo.host");
        if(StringUtils.isNotEmpty(value)){
          String[] hosts = value.split(",");
          for(int i=0;i<hosts.length;i++){
              try{
                  String[] message = hosts[i].split(":");
                  String ip = message[0];
                  int port =Integer.parseInt(message[1]);
                  ServerAddress serverAddress = new ServerAddress(ip,port);
                  servers.add(serverAddress);
              } catch (Exception e){
                  e.printStackTrace();
                  throw new RuntimeException("解析mongodb地址部分失败")  ;
              }
          }
          return servers;
        }
        throw  new RuntimeException("mongo.host配置错误,请检查");
    }

    public int getConnectionsPerHost() {
        return connectionsPerHost;
    }

    public int getThreadsAllowedToBlockForConnectionMultiplie() {
        return threadsAllowedToBlockForConnectionMultiplie;
    }

    public ArrayList<ServerAddress> getServers() {
        return servers;
    }

    public String getUsername() {
        return username;
    }

    public String getPassword() {
        return password;
    }
}

tasks :每秒的任务数,假设为500~1000
taskcost:每个任务花费时间,假设为0.1s
responsetime:系统允许容忍的最大响应时间,假设为1s
做几个计算
corePoolSize = 每秒需要多少个线程处理?
threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
queueCapacity = (coreSizePool/taskcost)responsetime
计算可得 queueCapacity = 80/0.1
1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
计算可得 maxPoolSize = (1000-80)/10 = 92
(最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理 (https://my.oschina.net/u/169390/blog/97415)
keepAliveTime和allowCoreThreadTimeout采用默认通常能满足

Intellij+ JRebel +maven+jetty实现热部署

pom.xml

            <plugin>
                <groupId>org.mortbay.jetty</groupId>
                <artifactId>jetty-maven-plugin</artifactId>
                <configuration>
                    <scanIntervalSeconds>1</scanIntervalSeconds>
                    <reload>automatic</reload>
                    <stopPort>9966</stopPort>
                    <stopKey>foo</stopKey>
                    <contextXml>${project.basedir}/src/main/resources/jetty-context.xml</contextXml>
                    <connectors>
                        <connector implementation="org.eclipse.jetty.server.nio.SelectChannelConnector">
                            <port>8080</port>
                            <maxIdleTime>60000</maxIdleTime>
                        </connector>
                    </connectors>
                    <webAppSourceDirectory>${basedir}/WebRoot</webAppSourceDirectory>
                    <webAppConfig>
                        <contextPath>/absurd</contextPath>
                    </webAppConfig>
                </configuration>
            </plugin>

关于这部分的参数,详见链接

安装jrebel以及破解见
IDEA破解
jrebel破解

qq 20160909115113
qq 20160909115148

${project.basedir}/src/main/resources/jetty-context.xml


<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Mort Bay Consulting//DTD Configure//EN" "http://www.eclipse.org/jetty/configure.dtd">
<Configure class="org.eclipse.jetty.webapp.WebAppContext">
    <Call name="setAttribute">
        <Arg>org.eclipse.jetty.server.webapp.WebInfIncludeJarPattern</Arg>
        <Arg>.*/.*jsp-api-[^/]\.jar$|./.*jsp-[^/]\.jar$|./.*taglibs[^/]*\.jar$</Arg>
    </Call>
</Configure>

ctrl+shift+f9编译当前class
ctrl+f9编译全部

使用gradle构建一个web项目

安装过程略去,配置下GRADLE_HOME和GRADLE_HOME\bin

  1. 创建一个空目录,新建build.gradle
apply plugin: 'idea'
apply plugin: 'java' 
apply plugin: 'war'
sourceCompatibility = 1.7

repositories {
  mavenCentral()
}

dependencies {
  compile 'org.springframework.boot:spring-boot-starter-web:1.3.5.RELEASE'
  compile 'log4j:log4j:1.2.17'
}


task createJavaProject << { 
  sourceSets*.java.srcDirs*.each { it.mkdirs() } 
  sourceSets*.resources.srcDirs*.each { it.mkdirs()} 
} 

task createWebProject(dependsOn: 'createJavaProject') << { 
  def webAppDir = file("$webAppDirName") 
  webAppDir.mkdirs() 
} 

2.gradle idea
3.gradle createWebProject
3.gradle build

mongodb

http://www.mongodb.org/downloads

mongod.exe --dbpath

创建数据库

use DATABASE_NAME

删除数据库

db.dropDatabase()

创建集合

db.createCollection(name, options)

options:
项​​参数是可选的,所以只需要到指定的集合名称。以下是可以使用的选项列表:

字段 类型 描述
capped Boolean (可选)如果为true,则启用封顶集合。封顶集合是固定大小的集合,会自动覆盖最早的条目,当它达到其最大大小。如果指定true,则需要也指定尺寸参数。
autoIndexID Boolean (可选)如果为true,自动创建索引_id字段的默认值是false。
size number (可选)指定最大大小字节封顶集合。如果封顶如果是 true,那么你还需要指定这个字段。
max number (可选)指定封顶集合允许在文件的最大数量。

删除集合

db.COLLECTION_NAME.drop()

数据类型:

  • String : 这是最常用的数据类型来存储数据。在MongoDB中的字符串必须是有效的UTF-8。
  • Integer : 这种类型是用来存储一个数值。整数可以是32位或64位,这取决于您的服务器。
  • Boolean : 此类型用于存储一个布尔值 (true/ false) 。
  • Double : 这种类型是用来存储浮点值。
  • Min/ Max keys : 这种类型被用来对BSON元素的最低和最高值比较。
  • Arrays : 使用此类型的数组或列表或多个值存储到一个键。
  • Timestamp : 时间戳。这可以方便记录时的文件已被修改或添加。
  • Object : 此数据类型用于嵌入式的文件。
  • Null : 这种类型是用来存储一个Null值。
  • Symbol : 此数据类型用于字符串相同,但它通常是保留给特定符号类型的语言使用。
  • Date : 此数据类型用于存储当前日期或时间的UNIX时间格式。可以指定自己的日期和时间,日期和年,月,日到创建对象。
  • Object ID : 此数据类型用于存储文档的ID。
  • Binary data : 此数据类型用于存储二进制数据。
  • Code : 此数据类型用于存储到文档中的JavaScript代码。
  • Regular expression : 此数据类型用于存储正则表达式

基本操作:

db.COLLECTION_NAME.insert(document)
db.COLLECTION_NAME.find()

db.mycol.find().pretty()//格式化
db.mycol.find({key1:value1, key2:value2}).pretty()
db.mycol.find(
   {
      $or: [
         {key1: value1}, {key2:value2}
      ]
   }
).pretty()

db.mycol.find("likes": {$gt:10}, $or: [{"by": "yiibai"}, {"title": "MongoDB Overview"}] }).pretty()
// 'where likes>10 AND (by = 'yiibai' OR title = 'MongoDB Overview')'
db.COLLECTION_NAME.update(SELECTIOIN_CRITERIA, UPDATED_DATA)
db.COLLECTION_NAME.save({_id:ObjectId(),NEW_DATA})
db.COLLECTION_NAME.remove(DELLETION_CRITTERIA)
db.COLLECTION_NAME.remove(DELETION_CRITERIA,1)
db.mycol.find({},{"title":1,_id:0})//1表示显示该字段0不显示
db.COLLECTION_NAME.find().limit(NUMBER)
db.COLLECTION_NAME.find().limit(NUMBER).skip(NUMBER)
db.COLLECTION_NAME.find().sort({KEY:1})

db.COLLECTION_NAME.aggregate(AGGREGATE_OPERATION)

操作 语法 例子 RDBMS 等同
Equality {:} db.mycol.find({"by":"tutorials yiibai"}).pretty() where by = 'tutorials yiibai'
Less Than {:{$lt:}} db.mycol.find({"likes":{$lt:50}}).pretty() where likes < 50
Less Than Equals {:{$lte:}} db.mycol.find({"likes":{$lte:50}}).pretty() where likes <= 50
Greater Than {:{$gt:}} db.mycol.find({"likes":{$gt:50}}).pretty() where likes > 50
Greater Than Equals {:{$gte:}} db.mycol.find({"likes":{$gte:50}}).pretty() where likes >= 50
Not Equals {:{$ne:}} db.mycol.find({"likes":{$ne:50}}).pretty() where likes != 50

db.COLLECTION_NAME.ensureIndex({KEY:1})//索引

ensureIndex() 方法也可以接受的选项列表(可选),其下面给出的列表:

参数 类型 描述
background Boolean 在后台建立索引,以便建立索引并不能阻止其他数据库活动。指定true建立在后台。默认值是 false.
unique Boolean 创建唯一索引,以便收集不会接受插入索引键或键匹配现有的值存储在索引文档。指定创建唯一索引。默认值是 false.
name string 索引的名称。如果未指定,MongoDB中都生成一个索引名索引字段的名称和排序顺序串联.
dropDups Boolean 创建一个唯一索引的字段,可能有重复。 MongoDB的索引只有第一次出现的一个键,从集合中删除的所有文件包含该键的后续出现的。指定创建唯一索引。默认值是 false.
sparse Boolean 如果为true,指数只引用文档指定的字段。这些索引使用更少的空间,但在某些情况下,特别是各种不同的表现。默认值是 false.
expireAfterSeconds integer 指定一个值,以秒为TTL控制多久MongoDB的文档保留在此集合.
v index version 索引版本号。默认的索引版本取决于mongodb 运行的版本在创建索引时.
weights document 权重是从1到99999范围内的数,表示该字段的意义,相对于其他的索引字段分数.
default_language string 对于文本索引时,决定停止词和词干分析器和标记生成规则列表的语言。默认值是 english.
language_override string 对于文本索引时,指定的名称在文档中包含覆盖默认的语言,语言字段中。默认值是语言。

db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$sum : 1}}}])

表达式 描述 实例
$sum 总结从集合中的所有文件所定义的值. db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$sum : "$likes"}}}])
$avg 从所有文档集合中所有给定值计算的平均. db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$avg : "$likes"}}}])
$min 获取集合中的所有文件中的相应值最小. db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$min : "$likes"}}}])
$max 获取集合中的所有文件中的相应值的最大. db.mycol.aggregate([{$group : {_id : "$by_user", num_tutorial : {$max : "$likes"}}}])
$push 值插入到一个数组生成文档中. db.mycol.aggregate([{$group : {_id : "$by_user", url : {$push: "$url"}}}])
$addToSet 值插入到一个数组中所得到的文档,但不会创建重复. db.mycol.aggregate([{$group : {_id : "$by_user", url : {$addToSet : "$url"}}}])
$first 根据分组从源文档中获取的第一个文档。通常情况下,这才有意义,连同以前的一些应用 “$sort”-stage. db.mycol.aggregate([{$group : {_id : "$by_user", first_url : {$first : "$url"}}}])
$last 根据分组从源文档中获取最后的文档。通常,这才有意义,连同以前的一些应用 “$sort”-stage. db.mycol.aggregate([{$group : {_id : "$by_user", last_url : {$last : "$url"}}}])

设置一个副本集
在本教程中,我们将mongod实例转换成独立的副本集。要转换到副本设置遵循以下步骤:

关闭停止已经运行的MongoDB服务器。
现在启动MongoDB服务器通过指定 --replSet 选项。 --replSet 基本语法如下:

mongod --port "PORT" --dbpath "YOUR_DB_DATA_PATH" --replSet 

"REPLICA_SET_INSTANCE_NAME"
例子

mongod --port 27017 --dbpath "D:set upmongodbdata" --replSet rs0

它会启动一个mongod 实例名称rs0 ,端口为27017。启动命令提示符 rs.initiate(),并连接到这个mongod实例。在mongod客户端执行命令rs.initiate()启动一个新的副本集。要检查副本集的配置执行命令rs.conf()。要检查的状态副本sete执行命令:rs.status()。

将成员添加到副本集
将成员添加到副本集,在多台机器上启动mongod 实例。现在开始一个mongod 客户和发出命令 rs.add().

rs.add("mongod1.net:27017")

java
建立连接

import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.DBCursor;
import com.mongodb.ServerAddress;

import java.util.Arrays;

// To connect to mongodb server
MongoClient mongoClient = new MongoClient( "localhost" , 27017 );
// Now connect to your databases
DB db = mongoClient.getDB( "test" );
boolean auth = db.authenticate(myUserName, myPassword);

获取一个集合列表

Set colls = db.getCollectionNames();
for (String s : colls) {
   System.out.println(s);
}

获取/选择一个集合

DBCollection coll = db.getCollection("mycol");

插入文档

BasicDBObject doc = new BasicDBObject("title", "MongoDB").
   append("description", "database").
   append("likes", 100).
   append("url", "http://www.yiibai.com/mongodb/").
   append("by", "yiibai.com").
   ;
coll.insert(doc);

查找第一个文档

DBObject myDoc = coll.findOne();
System.out.println(myDoc);

和spring配合

<mongo:mongo id="mongo" replica-set="${mongodb.url}">
    <mongo:options connections-per-host="${mongo.connectionsPerHost}"
            threads-allowed-to-block-for-connection-multiplier="${mongo.threadsAllowedToBlockForConnectionMultiplier}"
            connect-timeout="${mongo.connectTimeout}"
            max-wait-time="${mongo.maxWaitTime}"
            auto-connect-retry="${mongo.autoConnectRetry}"
            socket-keep-alive="${mongo.socketKeepAlive}"
            socket-timeout="${mongo.socketTimeout}" slave-ok="${mongo.slaveOk}"
            write-number="1" write-timeout="0" write-fsync="true" />
        <mongo:options write-number="1" write-timeout="0"
            write-fsync="true" />
    </mongo:mongo>
<mongo:db-factory dbname="${mongodb.dbname}" mongo-ref="mongo" />
    <bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
        <constructor-arg name="mongoDbFactory" ref="mongoDbFactory" />
    </bean>

使用 MongoTemplate

  Person p = new Person("Joe", 34);

  // Insert is used to initially store the object into the database.
  mongoOps.insert(p);
  log.info("Insert: " + p);

  // Find
  p = mongoOps.findById(p.getId(), Person.class);   
  log.info("Found: " + p);

  // Update
  mongoOps.updateFirst(query(where("name").is("Joe")), update("age", 35), Person.class);    
  p = mongoOps.findOne(query(where("name").is("Joe")), Person.class);
  log.info("Updated: " + p);

  // Delete
  mongoOps.remove(p);

  // Check that deletion worked
  List<Person> people =  mongoOps.findAll(Person.class);
  log.info("Number of people = : " + people.size());

  mongoOps.dropCollection(Person.class);
Query query=new Query(
    Criteria.where("AAA").is(XXobj.getAAA()).
    orOperator(Criteria.where("BBB").is(XXobj.getBBB()))
    );
List<XXObject> result = mongoTemplate.find(query, XXObject.class);
if(result!=null && !result.isEmpty()){
    return result.get(0);
}

XXObject obj = mongoTemplate.findOne(query, XXObject.class);
if(obj!=null){
    return obj;
}

Tomcat源码

在Tomcat架构中,Connector主要负责处理与客户端的通信。Connector的实例用于监听端口,接受来自客户端的请求并将请求转交给Engine处理。同时将来自Engine的答复返回给客户端。

  • AJP Connector, 基于AJP协议,AJP是专门设计用来为tomcat与http服务器之间通信专门定制的协议,能提供较高的通信速度和效率。如与Apache服务器集成时,采用这个协议。
  • APR HTTP Connector, 用C实现,通过JNI调用的。主要提升对静态资源(如HTML、图片、CSS、JS等)的访问性能。现在这个库已独立出来可用在任何项目中。Tomcat在配置APR之后性能非常强劲。
    具体地,Tomcat7中实现了以下几种Connector:
  • org.apache.coyote.http11.Http11Protocol : 支持HTTP/1.1 协议的连接器。
  • org.apache.coyote.http11.Http11NioProtocol : 支持HTTP/1.1 协议+New IO的连接器。
  • org.apache.coyote.http11.Http11AprProtocol : 使用APR(Apache portable runtime)技术的连接器,利

用Native代码与本地服务器(如linux)来提高性能。
(以上三种Connector实现都是直接处理来自客户端Http请求,加上NIO或者APR)

  • org.apache.coyote.ajp.AjpProtocol:使用AJP协议的连接器,实现与web server(如Apache httpd)之间的通信
  • org.apache.coyote.ajp.AjpNioProtocol:SJP协议+ New IO
  • org.apache.coyote.ajp.AjpAprProtocol:AJP + APR

(这三种实现方法则是与web server打交道,同样加上NIO和APR)

Connector的配置
(1) BIO HTTP/1.1 Connector配置
一个典型的配置如下:  

<Connector  port=”8080”  protocol=”HTTP/1.1”  maxThreads=”150”  conn ectionTimeout=”20000”   redirectPort=”8443” />

其它一些重要属性如下:
acceptCount : 接受连接request的最大连接数目,默认值是10
address : 绑定IP地址,如果不绑定,默认将绑定任何IP地址
allowTrace : 如果是true,将允许TRACE HTTP方法
compressibleMimeTypes : 各个mimeType, 以逗号分隔,如text/html,text/xml
compression : 如果带宽有限的话,可以用GZIP压缩
connectionTimeout : 超时时间,默认为60000ms (60s)
maxKeepAliveRequest : 默认值是100
maxThreads : 处理请求的Connector的线程数目,默认值为200
如果是SSL配置,如下:

<Connector port="8181" protocol="HTTP/1.1" SSLEnabled="true" 
    maxThreads="150" scheme="https" secure="true" 
    clientAuth="false" sslProtocol = "TLS" 
    address="0.0.0.0" 
    keystoreFile="E:/java/jonas-full-5.1.0-RC3/conf/keystore.jks" 
    keystorePass="changeit" />

其中,keystoreFile为证书位置,keystorePass为证书密码
(2) NIO HTTP/1.1 Connector配置

<Connector port=”8080” protocol=”org.apache.coyote.http11.Http11NioProtocol” maxThreads=”150” connectionTimeout=”20000” redirectPort=”8443”/>

(3) Native APR Connector配置
ARP是用C/C++写的,对静态资源(HTML,图片等)进行了优化。所以要下载本地库tcnative-1.dll与openssl.exe,将其放在%tomcat%\bin目录下。
下载地址是:http://tomcat.heanet.ie/native/1.1.10/binaries/win32/
在server.xml中要配置一个Listener,如下图。这个配置tomcat是默认配好的。

<!--APR library loader. Documentation at /docs/apr.html --> 
<Listener className="org.apache.catalina.core.AprLifecycleListener" SSLEngine="on" />

配置使用APR connector

<Connector port=”8080” protocol=”org.apache.coyote.http11.Http11AprProtocol” 
maxThreads=”150” connectionTimeout=”20000” redirectPort=”8443”

如果配置成功,启动tomcat,会看到如下信息:
org.apache.coyote.http11.Http11AprProtocol init
(4) AJP Connector配置:

<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />

RabbitMQ消息队列

关于详细介绍:http://blog.csdn.net/column/details/rabbitmq.html
RabbitMQ:基于AMQP协议(Advanced Message Queue Protocol)介绍:http://www.infoq.com/cn/articles/AMQP-RabbitMQ/
ActiveMQ:基于STOMP协议

所需环境:
1.Erlang
2.RabbitMQ
3.rabbit-client.jar api

http://www.lxway.com/991402946.htm
Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
image

Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
image

Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到“audit.irs”。我在RedHat的朋友做了一张不错的图,来表明topic交换机是如何工作的:
image

 ConnectionFactory connFactory = new ConnectionFactory();//创建连接连接到MabbitMQ 
  connFactory.setUri(uri);//或 factory.setHost("localhost");  设置ip、uri或host
Connection connection = factory.newConnection();  //创建一个连接
  Channel channel = connection.createChannel();  //创建一个Channel 
channel.queueDeclare(queue, true, false, false, null);//指定队列
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());     //往队列中发出一条消息  
 //关闭频道和连接  
 channel.close();  
connection.close();  

Connecting to a broker

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

uri

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

Using Exchanges and Queues
声明一个exchange然后把队列和exchange和队列绑定起来(只有绑定以后,往exchange投递才会跑到相应队列)

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

(完整的绑定过程)

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

Publishing messages

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.basicPublish(exchangeName, routingKey, mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

delivery mode 2 (persistent), priority 1 , content-type "text/plain".

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .contentType("text/plain")
               .deliveryMode(2)
               .priority(1)
               .userId("bob")
               .build()),
               messageBodyBytes);

自定义header

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude",  51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build()),
               messageBodyBytes);

expiration

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .expiration("60000")
               .build()),
               messageBodyBytes);

在确认模式下发布大量的信息到一个通道,等待确认

package com.rabbitmq.examples;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;

public class ConfirmDontLoseMessages {
    static int msgCount = 10000;
    final static String QUEUE_NAME = "confirm-test";
    static ConnectionFactory connectionFactory;

    public static void main(String[] args)
        throws IOException, InterruptedException
    {
        if (args.length > 0) {
                msgCount = Integer.parseInt(args[0]);
        }

        connectionFactory = new ConnectionFactory();

        // Consume msgCount messages.
        (new Thread(new Consumer())).start();
        // Publish msgCount messages and wait for confirms.
        (new Thread(new Publisher())).start();
    }

    @SuppressWarnings("ThrowablePrintedToSystemOut")
    static class Publisher implements Runnable {
        public void run() {
            try {
                long startTime = System.currentTimeMillis();

                // Setup
                Connection conn = connectionFactory.newConnection();
                Channel ch = conn.createChannel();
                ch.queueDeclare(QUEUE_NAME, true, false, false, null);
                ch.confirmSelect();

                // Publish
                for (long i = 0; i < msgCount; ++i) {
                    ch.basicPublish("", QUEUE_NAME,
                                    MessageProperties.PERSISTENT_BASIC,
                                    "nop".getBytes());
                }

                ch.waitForConfirmsOrDie();

                // Cleanup
                ch.queueDelete(QUEUE_NAME);
                ch.close();
                conn.close();

                long endTime = System.currentTimeMillis();
                System.out.printf("Test took %.3fs\n",
                                  (float)(endTime - startTime)/1000);
            } catch (Throwable e) {
                System.out.println("foobar :(");
                System.out.print(e);
            }
        }
    }

    static class Consumer implements Runnable {
        public void run() {
            try {
                // Setup
                Connection conn = connectionFactory.newConnection();
                Channel ch = conn.createChannel();
                ch.queueDeclare(QUEUE_NAME, true, false, false, null);

                // Consume
                QueueingConsumer qc = new QueueingConsumer(ch);
                ch.basicConsume(QUEUE_NAME, true, qc);
                for (int i = 0; i < msgCount; ++i) {
                    qc.nextDelivery();
                }

                // Cleanup
                ch.close();
                conn.close();
            } catch (Throwable e) {
                System.out.println("Whoosh!");
                System.out.print(e);
            }
        }
    }
}

*(the AMQP specification document)[http://www.amqp.org/]

接收消息的最有效的方法是建立一个订阅使用消费者接口。将自动被交付的消息到达,而不必显式地请求

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

接收个别信息

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    ...
    channel.basicAck(method.deliveryTag, false); //  autoAck = false必须设置 Channel.basicAck来确认已经接受消息

处理不被路由的消息
假如一个信息被设置强制性(mandatory)的flag不被路由的话会被送到发送端。
如果客户端没有配置返回特定通道侦听器,将放弃返回的相关消息。
为了获取这个消息,客户端可以实现ReturnListener 接口还有调用 Channel.setReturnListener

channel.setReturnListener(new ReturnListener() {
    public void handleBasicReturn(int replyCode,
                                  String replyText,
                                  String exchange,
                                  String routingKey,
                                  AMQP.BasicProperties properties,
                                  byte[] body)
    throws IOException {
        ...
    }
});

关闭协议

The AMQP 0-9-1 connection and channel have the following lifecycle states:

open: the object is ready to use
closing: the object has been explicitly notified to shut down locally, has issued a shutdown request to any supporting lower-layer objects, and is waiting for their shutdown procedures to complete
closed: the object has received all shutdown-complete notification(s) from any lower-layer objects, and as a consequence has shut itself down

The AMQP connection and channel objects possess the following shutdown-related methods:

addShutdownListener(ShutdownListener listener) and removeShutdownListener(ShutdownListener listener), to manage any listeners, which will be fired when the object transitions to closed state. Note that, adding a ShutdownListener to an object that is already closed will fire the listener immediately
getCloseReason(), to allow the investigation of what was the reason of the object’s shutdown
isOpen(), useful for testing whether the object is in an open state
close(int closeCode, String closeMessage), to explictly notify the object to shut down.

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause)
    {
        ...
    }
});

ShutdownSignalException包含了关闭时的错误异常

public void shutdownCompleted(ShutdownSignalException cause)
{
  if (cause.isHardError())
  {
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication())
    {
      Method reason = cause.getReason();
      ...
    }
    ...
  } else {
    Channel ch = (Channel)cause.getReference();
    ...
  }
}

原子性的使用open

public void brokenMethod(Channel channel)
{
    if (channel.isOpen())
    {
        // The following code depends on the channel being in open state.
        // However there is a possibility of the change in the channel state
        // between isOpen() and basicQos(1) call
        ...
        channel.basicQos(1);//告诉RabbitMQ同一时间给一个消息给消费者  
    }
}

处于无效状态时应该抓取异常

public void validMethod(Channel channel)
{
    try {
        ...
        channel.basicQos(1);
    } catch (ShutdownSignalException sse) {
        // possibly check if channel was closed
        // by the time we started action and reasons for
        // closing it
        ...
    } catch (IOException ioe) {
        // check why connection was closed
        ...
    }
}

连接设置
设置pool数

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

使用地址列表

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                 , new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

心跳超时(Heartbeat Timeout) Heartbeats guide
自定义线程工厂

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());

Automatic Recovery From Network Failures

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();


ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);


ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);

The RPC (Request/Reply) Pattern

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);



byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)

h2数据库

H2内存数据库的安装和维护
http://wenku.baidu.com/link?url=kZ8Tll42ZeZvRVzsbcUyGDe3q8UKc76h162iP-wYO8lSGSqKAfu18xKz4HqG4VLQy6QobsKzDxCD81Q-uuUY2wHdN0ThzSt35Mj2cExgupS
1.安装文件
http://www.h2database.com/html/download.html
安装文件:h2-2014-04-05.zip
2.安装步骤
1)远程连接Linux服务器,进入该目录/usr/local/h2
2)上传安装文件至h2目录下;(可自行设定安装目录)
3)解压该安装文件:
[root@localhost h2]# unzip h2-2014-04-05.zip
4)解压成功后,需要修改两个文件h2.sh和build.sh的JAVA_HOME路径;
a、获取当前JAVA_HOME的路径
[root@localhost h2]# echo $JAVA_HOME
/usr/java/jdk1.6.0_45
b、h2.sh文件在h2/bin目录下;

[root@localhost h2]# cd h2/bin/
[root@localhost bin]# ls
h2-1.3.176.jar  h2.bat  h2.sh  h2w.bat

c、编辑h2.sh
[root@localhost bin]# vim h2.sh

#!/bin/sh
dir=$(dirname "$0")
java -cp "$dir/h2-1.3.176.jar:$H2DRIVERS:$CLASSPATH" org.h2.tools.Server -tcpAllowOthers  -webAllowOthers  -webPort  8090  "$@"

org.h2.tools.Server: 以服务器模式启动
-tcpAllowOthers: 允许远程机器通过TCP方式访问
-webAllowOthers: 允许远程机器通过浏览器访问
-webPort 8090: 默认的访问端口(8090为未被占用的端口,如果此端口已经被其他端口占用,则改为其他端口)

[root@localhost bin]# chmod 777 h2.sh
启动h2服务,命令:nohup ./h2.sh &

通过远程浏览器来访问h2是否部署成功
访问地址:http://服务器ip:8090/

 Jdbc:h2:tcp://服务器ip/test:  test是数据库文件的名称,不用指定test文件路径,是因为之前c、配置h2.sh已经指定默认路径了(/home/omp_nms/h2/bin/) 
 User name : sa   h2默认的用户,密码为空 
 点击如图connect按钮,NMS数据库文件自动生成到/home/omp_nms/h2/bin 目录下:NMS.h2.db 

说明:目前项目用的数据库名为:NMS,用户名:sa,密码:nms,所以使用H2数据库时,按此要求进行创建。
3、维护
1)进入远程服务器,查看h2的服务是否还在
执行命令:ps -ef |grep h2.sh,如图说明h2服务正常运行
[root@localhost bin]# ps -ef | grep h2.sh
root 27220 3387 0 09:54 pts/0 00:00:00 /bin/sh ./h2.sh
root 27316 3387 0 10:06 pts/0 00:00:00 grep h2.sh
2)通过远程浏览器访问h2数据库
访问地址:http://服务器ip:8090/
出现如下页面,user name输入:_,password输入:_,最顶部下拉框可以选择显示语言,并点击test connection会出现test successful说明h2正常工作,

如上图所示,点击Connect按钮,即可进入H2数据库,进行表的相关操作;(脚本内容执行后,在下图左侧可以看到创建的库表)

3)如何修改用户密码
要修改密码,需要登录进去后用sql语句进行修改,如下语句,登录NMS数据库后将sa用户的密码修改为nms
ALTER USER sa SET { PASSWORD 'nms' }
4)创建数据库和表
当H2数据库安装成功并启动成功后,访问地址:http://服务器ip:8090/,会出现如下图

创建数据库
如上图,将JDBC URL: jdbc:h2:~/test,最后的test修改为NMS,然后username为sa,password为nms(注:如果是第一次创建数据库时,输入的密码为nms,则之后每次登陆都要用此密码),点击connect按钮,就可以进入到NMS数据库后台,如下图(下图左侧在实际登录后是没有库表的,需要执行我们提供的建表脚本后才有)

创建表
打开nmsH2_createTable.sql脚本,将里面的全部内容复制到上图SQL语句区域,点击“Run”,即可成功创建表。 另外,也可以在SQL语句区域手工输入创建表语句进行表创建。不支持可视化创建表

5)数据库连接 URL说明
数据库支持多种连接模式和连接设置,不同的连接模式和连接设置是通过不同的URL来区分的,URL中的设置是不区分大小写。
Topic URL Format and Examples
嵌入式(本地)连接 jdbc:h2:[file:][]

    jdbc:h2:~/test  
    jdbc:h2:file:/data/sample   
    jdbc:h2:file:C:/data/sample (Windows only)  

内存数据库(私有) jdbc:h2:mem:
内存数据库(被命名) jdbc:h2:mem:
jdbc:h2:mem:test_mem
使用TCP/IP的服务器模式(远程连接) jdbc:h2:tcp://[:]/[]

    jdbc:h2:tcp://localhost/~/test  
    jdbc:h2:tcp://dbserv:8084/~/sample  

使用SSL/TLS的服务器模式(远程连接) jdbc:h2:ssl://[:]/
jdbc:h2:ssl://secureserv:8085//sample;
使用加密文件 jdbc:h2:;CIPHER=[AES|XTEA]
jdbc:h2:ssl://secureserv/
/testdb;CIPHER=AES
jdbc:h2:file:/secure;CIPHER=XTEA
文件锁 jdbc:h2:;FILE_LOCK={NO|FILE|SOCKET}
jdbc:h2:file:
/quickAndDirty;FILE_LOCK=NO
jdbc:h2:file:/private;CIPHER=XTEA;FILE_LOCK=SOCKET
仅打开存在的数据库 jdbc:h2:;IFEXISTS=TRUE
jdbc:h2:file:
/sample;IFEXISTS=TRUE
当虚拟机退出时并不关闭数据库 jdbc:h2:;DB_CLOSE_ON_EXIT=FALSE
用户名和密码 jdbc:h2:[;USER=][;PASSWORD=]
jdbc:h2:file:/sample;USER=sa;PASSWORD=123
更新记入索引 jdbc:h2:;LOG=2
jdbc:h2:file:
/sample;LOG=2
调试跟踪项设置 jdbc:h2:;TRACE_LEVEL_FILE=<level 0..3>
jdbc:h2:file:/sample;TRACE_LEVEL_FILE=3
忽略位置参数设置 jdbc:h2:;IGNORE_UNKNOWN_SETTINGS=TRUE
指定文件读写模式 jdbc:h2:;ACCESS_MODE_LOG=rws;ACCESS_MODE_DATA=rws
在Zip文件中的数据库 jdbc:h2:zip:!/
jdbc:h2:zip:
/db.zip!/test
兼容模式 jdbc:h2:;MODE=
jdbc:h2:/test;MODE=MYSQL
自动重连接 jdbc:h2:;AUTO_RECONNECT=TRUE
jdbc:h2:tcp://localhost/
/test;AUTO_RECONNECT=TRUE
自动混合模式 jdbc:h2:;AUTO_SERVER=TRUE
jdbc:h2:/test;AUTO_SERVER=TRUE
更改其他设置 jdbc:h2:;=[;=...]
jdbc:h2:file:
/sample;TRACE_LEVEL_SYSTEM_OUT=3

jackson过滤空值

<bean id="notNullObjectMapper" class="org.codehaus.jackson.map.ObjectMapper">
        <property name="serializationInclusion">
            <value type="org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion">NON_NULL</value>
        </property>
    </bean>


    <mvc:annotation-driven>
        <mvc:message-converters>
            <bean class="org.springframework.http.converter.json.MappingJacksonHttpMessageConverter">
                <property name="supportedMediaTypes" value="application/json;charset=UTF-8" />
                <property name="objectMapper" ref="notNullObjectMapper" />
            </bean>

        </mvc:message-converters>
        <mvc:argument-resolvers>
            <bean class="XXX.XXXArgumentResolver" />
        </mvc:argument-resolvers>
    </mvc:annotation-driven>

scala 学习(三)

List

val numbers = List(1, 2, 3, 4)
1::2::3::4

set

Set(1,2,3)

Seq

scala> Seq(1, 1, 2)
res3: Seq[Int] = List(1, 1, 2)

Tuple

val hostPort = ("localhost", 80)

hostPort._1
hostPort._2


hostPort match {
  case ("localhost", port) => ...
  case (host, port) => ...
}


scala> val h2 = "localhost"->80
h2: (String, Int) = (localhost,80)

scala> h2._1
res35: String = localhost

scala> h2._2
res36: Int = 80

scala> val (host,port) = h2
host: String = localhost
port: Int = 80
val t = (1 , 3.3 , "ews")
val value = t._2//get
val (fir ,sec , thir) = t
val (fir ,sec , _) = t//第三个不需要
"New York" .partition( _.isUpper)//("NY","ew ork")

map

//不可变
val scores = Map( "alice" -> 10 , "Bob" -> 3)
val scores = Map( ("alice" ,10) , ("Bob" , 3))
//可变
val scores = scala.collection.mutable.Map("alice" -> 10 , "Bob" -> 3)
val scores = scala.collection.mutable.HashMap[String , Int ]

//获取值
scores("Bob")
scores.getOrElse("Bob" , 0)
//set(可变)
scores("Bob") = 10
score += ("Bob"->10,"Fred" ->7)
for ((k,v) <- scores) //处理

方法

Traversable

下面所有方法在子类中都是可用的。参数和返回值的类型可能会因为子类的覆盖而看起来不同。

def head : A
def tail : Traversable[A]

这里是函数组合子定义的地方。

def map [B] (f: (A) => B) : CC[B]

返回每个元素都被 f 转化的集合

def foreach[U](f: Elem => U): Unit

在集合中的每个元素上执行 f 。

def find (p: (A) => Boolean) : Option[A]

返回匹配谓词函数的第一个元素

def filter (p: (A) => Boolean) : Traversable[A]

返回所有匹配谓词函数的元素集合

划分:

def partition (p: (A) ⇒ Boolean) : (Traversable[A], Traversable[A])

按照谓词函数把一个集合分割成两部分

def groupBy [K] (f: (A) => K) : Map[K, Traversable[A]]

转换:

有趣的是,你可以转换集合类型。

def toArray : Array[A]
def toArray [B >: A] (implicit arg0: ClassManifest[B]) : Array[B]
def toBuffer [B >: A] : Buffer[B]
def toIndexedSeq [B >: A] : IndexedSeq[B]
def toIterable : Iterable[A]
def toIterator : Iterator[A]
def toList : List[A]
def toMap [T, U] (implicit ev: <:<[A, (T, U)]) : Map[T, U]
def toSeq : Seq[A]
def toSet [B >: A] : Set[B]
def toStream : Stream[A]
def toString () : String
def toTraversable : Traversable[A]

把映射转换为一个数组,您会得到一个键值对的数组。

scala> Map(1 -> 2).toArray
res41: Array[(Int, Int)] = Array((1,2))

与Java

import scala.collection.JavaConverters._
  val sl = new scala.collection.mutable.ListBuffer[Int]
  val jl : java.util.List[Int] = sl.asJava
  val sl2 : scala.collection.mutable.Buffer[Int] = jl.asScala
  assert(sl eq sl2)

双向转换:

scala.collection.Iterable <=> java.lang.Iterable
scala.collection.Iterable <=> java.util.Collection
scala.collection.Iterator <=> java.util.{ Iterator, Enumeration }
scala.collection.mutable.Buffer <=> java.util.List
scala.collection.mutable.Set <=> java.util.Set
scala.collection.mutable.Map <=> java.util.{ Map, Dictionary }
scala.collection.mutable.ConcurrentMap <=> java.util.concurrent.ConcurrentMap

此外,也提供了以下单向转换

scala.collection.Seq => java.util.List
scala.collection.mutable.Seq => java.util.List
scala.collection.Set => java.util.Set
scala.collection.Map => java.util.Map

选项 Option

Option 是一个表示有可能包含值的容器。
Option基本的接口是这样的:

trait Option[T] {
  def isDefined: Boolean
  def get: T
  def getOrElse(t: T): T
}

Write

var username: Option[String] = None
...
username = Some("foobar")

instead of

var username: String = null
...
username = "foobar"

write

opt foreach { value =>
  operate(value)
}

instead of

if (opt.isDefined)
  operate(opt.get)
val result = res1 match {
  case Some(n) => n * 2
  case None => 0
}

拉链操作

val sys = Array( "<" , "-" , ">" )
val cos = Array( 2 , 10 , 2 )
val pairs = sys.zip(cos)
//结果 Array( ("<",2) , ("-",10) , (">",2) )

//getter/setter
age= / age_=

私有字段,但是如下,Counter对象之间可以互相访问

class Counter {
 private var value = 0
 def increment() { value += 1 }
 def isLess( other : Counter ) = value < other.value
}

如果想要完全变成只能访问本对象

private[this] var value= 0

Bean 属性

class Person{
@BeanProperty var name : String = _
}

主构造

class Person( val name :String ,val age : Int){}

class Person( val name :String ="" ,private val age : Int = 0)

抽象类

abstract class Shape {
       def getArea():Int    // subclass should define this
}

特质(Traits)

trait Car {
  val brand: String
}

class BMW extends Car {
  val brand = "BMW"
}

通过with关键字,一个类可以扩展多个特质:

class BMW extends Car with Shiny {
  val brand = "BMW"
  val shineRefraction = 12
}

特质是可以有实现的

trait ConsoleLogger{
def log(msg:String)=println(msg)
}

class ALogger extends ConsoleLogger

scala> val a = new ALogger
a: ALogger = ALogger@32193528

scala> a.log("1")
1

trait Cache[K, V] {
  def get(key: K): V
  def put(key: K, value: V)
  def delete(key: K)
}

对象

也叫单例对象,单例对象用于持有一个类的唯一实例。通常用于工厂模式

object Timer {
  var count = 0

  def currentCount(): Long = {
    count += 1
    count
  }
}
class Student(name:String)
object Student{
def apply(name:String) = new Student(name)
}

scala> Student("2")
res26: Student = Student@290ad6a9

scala 学习(一)

scala官网描述了这个语言环境搭建的三种方式

  1. 下载windows msi包
  2. Lightbend Activator zip压缩包
  3. Eclipse、NetBeans和Intellij三种IDE插件

下面我介绍下用Intellij如何加入插件

  1. 快捷键ctrl+alt+s 打开setting弹窗
  2. 输入plugins
  3. 右边输入scala,没有的话,点击下面的search in respo... 找到安装

CentOS 下对 Nginx + Tomcat 配置 SSL 实现服务器 / 客户端双向认证

来源:http://blog.csdn.net/defonds/article/details/44410359

方便复制:

安装

验证安装

sudo /usr/local/nginx/sbin/nginx -t
nginx: the configuration file /usr/local/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /usr/local/nginx/conf/nginx.conf test is successful
证明 Nginx 安装成功。

SSL 服务器 / 客户端双向验证证书的生成

创建一个新的 CA 根证书

sudo mkdir /usr/local/nginx/ca
 sudo mkdir /usr/local/nginx/ca/newcerts /usr/local/nginx/ca/private /usr/local/nginx/ca/conf /usr/local/nginx/ca/server

sudo vim /usr/local/nginx/ca/conf/openssl.conf

[ ca ]
default_ca      = foo                   # The default ca section

[ foo ]
dir            = /usr/local/nginx/ca         # top dir
database       = /usr/local/nginx/ca/index.txt          # index file.
new_certs_dir  = /usr/local/nginx/ca/newcerts           # new certs dir

certificate    = /usr/local/nginx/ca/private/ca.crt         # The CA cert
serial         = /usr/local/nginx/ca/serial             # serial no file
private_key    = /usr/local/nginx/ca/private/ca.key  # CA private key
RANDFILE       = /usr/local/nginx/ca/private/.rand      # random number file

default_days   = 365                     # how long to certify for
default_crl_days= 30                     # how long before next CRL
default_md     = md5                     # message digest method to use
unique_subject = no                      # Set to 'no' to allow creation of
                                         # several ctificates with same subject.
policy         = policy_any              # default policy

[ policy_any ]
countryName = match
stateOrProvinceName = match
organizationName = match
organizationalUnitName = match
localityName            = optional
commonName              = supplied
emailAddress            = optional

生成私钥 key 文件

cd /usr/local/nginx/ca
sudo openssl genrsa -out private/ca.key

输出
Generating RSA private key, 512 bit long modulus
..++++++++++++
.++++++++++++
e is 65537 (0x10001)
private 目录下有 ca.key 文件生成。
注:openssl 默认生成 512 位的。一般是用 2048 位的。

生成证书请求 csr 文件

sudo openssl req -new -key private/ca.key -out private/ca.csr
提示输入 Country Name,输入 CN 并回车.提示输入 State or Province Name (full name),输入 Shanghai 并回车.提示输入 Locality Name,输入 Shanghai 并回车.提示输入 Organization Name,输入 Defonds 并回车.提示输入 Organizational Unit Name,输入 Dev 并回车.提示输入 Common Name,如果没有域名的话,输入 localhost 并回车.提示输入 Email Address,输入 [email protected] 并回车.提示输入 A challenge password,这个是根证书口令。输入 defonds 并回车.提示输入 An optional company name,输入 df 并回车。private 目录下有 ca.csr 文件生成。

生成凭证 crt 文件

sudo openssl x509 -req -days 365 -in private/ca.csr -signkey private/ca.key -out private/ca.crt
控制台输出
Signature ok
subject=/C=CN/ST=Shanghai/L=Shanghai/O=Defonds/OU=Dev/CN=localhost/emailAddress=[email protected]
Getting Private key
private 目录下有 ca.crt 文件生成。

为我们的 key 设置起始序列号

sudo echo FACE > serial
可以是任意四个字符
注:如果不行,sudo sh -c 'echo FACE > serial'

创建 CA 键库

sudo touch index.txt

为 "用户证书" 的移除创建一个证书撤销列表

sudo openssl ca -gencrl -out /usr/local/nginx/ca/private/ca.crl -crldays 7 -config "/usr/local/nginx/ca/conf/openssl.conf"

输出
Using configuration from /usr/local/nginx/ca/conf/openssl.conf
private 目录下有 ca.crl 文件生成。

服务器证书的生成

创建一个 key

sudo openssl genrsa -out server/server.key

输出
Generating RSA private key, 512 bit long modulus
...........................++++++++++++
.................++++++++++++
e is 65537 (0x10001)
server 目录下有 server.key 文件生成。
注:openssl 默认生成 512 位的。一般是用 2048 位的:sudo openssl genrsa -out server/server.key 2048

为我们的 key 创建一个证书签名请求 csr 文件

sudo openssl req -new -key server/server.key -out server/server.csr
这时会要求你输入和 2.1.2.2 步一样的那些问题,所有输入需要和那一步一致。但 A challenge password 是服务器证书口令,可以与根证书口令一致。这里:

server 目录下有 server.csr 文件生成。

使用我们私有的 CA key 为刚才的 key 签名

sudo openssl ca -in server/server.csr -cert private/ca.crt -keyfile private/ca.key -out server/server.crt -config "/usr/local/nginx/ca/conf/openssl.conf"
输出
两次都输入 y,server 目录下有 server.crt 文件生成。

客户端证书的生成

创建存放 key 的目录 users

sudo mkdir users
位置 /usr/local/nginx/ca/users。

为用户创建一个 key

sudo openssl genrsa -des3 -out /usr/local/nginx/ca/users/client.key 1024

要求输入 pass phrase,这个是当前 key 的口令,以防止本密钥泄漏后被人盗用。两次输入同一个密码(比如我这里输入 defonds),users 目录下有 client.key 文件生成。

为 key 创建一个证书签名请求 csr 文件

` sudo openssl req -new -key /usr/local/nginx/ca/users/client.key -out /usr/local/nginx/ca/users/client.csr
提示输入 pass phrase,即 client.key 的口令。将 2.3.2 步保存的 pass phrase 输入后并回车:
要求你输入和 2.1.3 步一样的那些问题。输入需要和那一步一致。但 A challenge password 是客户端证书口令(请注意将它和 client.key 的口令区分开!),可以与服务器端证书或者根证书口令一致:
users 目录下有 client.csr 文件生成。

使用我们私有的 CA key 为刚才的 key 签名

sudo openssl ca -in /usr/local/nginx/ca/users/client.csr -cert /usr/local/nginx/ca/private/ca.crt -keyfile /usr/local/nginx/ca/private/ca.key -out /usr/local/nginx/ca/users/client.crt -config "/usr/local/nginx/ca/conf/openssl.conf"
输出
Using configuration from /usr/local/nginx/ca/conf/openssl.conf
Check that the request matches the signature
Signature ok
The Subject's Distinguished Name is as follows
countryName :PRINTABLE:'CN'
stateOrProvinceName :PRINTABLE:'Shanghai'
localityName :PRINTABLE:'Shanghai'
organizationName :PRINTABLE:'Defonds'
organizationalUnitName:PRINTABLE:'Dev'
commonName :PRINTABLE:'localhost'
emailAddress :IA5STRING:'[email protected]'
Certificate is to be certified until Mar 16 11:47:48 2016 GMT (365 days)
Sign the certificate? [y/n]:y

1 out of 1 certificate requests certified, commit? [y/n]y
Write out database with 1 new entries
Data Base Updated
两次都输入 y,users 目录下有 client.crt 文件生成。
`

将证书转换为大多数浏览器都能识别的 PKCS12 文件

sudo openssl pkcs12 -export -clcerts -in /usr/local/nginx/ca/users/client.crt -inkey /usr/local/nginx/ca/users/client.key -out /usr/local/nginx/ca/users/client.p12

要求输入 client.key 的 pass phrase,输入 2.3.2 步输入的 pass phrase 并回车后:
要求输入 Export Password,这个是客户端证书的保护密码(其作用类似于 2.3.3 保存的口令),在客户端安装证书的时候需要输入这个密码。我还是输入 defonds。users 目录下有 client.p12 文件生成。

nginx.conf

worker_processes  1;

error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    log_format  main  '[$time_local] $remote_addr - "$request" '
                      '$status "$http_user_agent" '
                      '"$args"';

    access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  120;
    client_max_body_size    120m;
    client_body_buffer_size 128k;
    server_names_hash_bucket_size 128;
    large_client_header_buffers 4 4k;
    open_file_cache max=8192 inactive=20s;
    open_file_cache_min_uses 1;
    open_file_cache_valid 30s;

    upstream tomcat_server {
    # Tomcat is listening on default 8080 port
        server 192.168.1.177:8080 fail_timeout=0;
    }

    server {
        listen       443;
        server_name  localhost;
        ssi on;
        ssi_silent_errors on;
        ssi_types text/shtml;

        ssl                  on;
        ssl_certificate      /usr/local/nginx/ca/server/server.crt;
        ssl_certificate_key  /usr/local/nginx/ca/server/server.key;
        ssl_client_certificate /usr/local/nginx/ca/private/ca.crt;

        ssl_session_timeout  5m;
        ssl_verify_client on;  #开户客户端证书验证

        ssl_protocols  SSLv2 SSLv3 TLSv1;
        ssl_ciphers ALL:!ADH:!EXPORT56:RC4+RSA:+HIGH:+MEDIUM:+LOW:+SSLv2:+EXP;
        ssl_prefer_server_ciphers   on;

        charset utf-8;
        access_log  logs/host.access.log  main;

        #error_page  404              /404.html;

        # redirect server error pages to the static page /50x.html
        #

        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
                root   html;
        }
        location = /favicon.ico {
                log_not_found off;
                access_log off;
                expires      90d;
        }
        location /swifton/ {
                proxy_pass http://tomcat_server;
                include proxy.conf;
        }      

    }
}

Tomcat 配置

Nginx 反向代理 HTTP 不需要更改 Tomcat 配置。与 HTTP 代理不同的是,这里需要通过更改 tomcat 的配置文件来告诉它前面的 HTTPS 代理。将 %tomcat%/conf/ 以下部分:

    <Connector port="8080" protocol="HTTP/1.1" 
               connectionTimeout="20000" 
               redirectPort="8443"
               scheme="https"
               proxyName="localhost"
               proxyPort="443" />

[转载]Cache应用中的服务过载案例研究

原文:http://tech.meituan.com/avalanche-study.html?from=timeline&isappinstalled=0
简单地说,过载是外部请求对系统的访问量突然激增,造成请求堆积,服务不可用,最终导致系统崩溃。本文主要分析引入Cache可能造成的服务过载,并讨论相关的预防、恢复策略。Cache在现代系统中使用广泛,由此引入的服务过载隐患无处不在,但却非常隐蔽,容易被忽视。本文希望能为开发者在设计和编写相关类型应用,以及服务过载发生处理时能够有章可循。

一个服务过载案例
本文讨论的案例是指存在正常调用关系的两个系统(假设调用方为A系统,服务方为B系统),A系统对B系统的访问突然超出B系统的承受能力,造成B系统崩溃。造成服务过载的原因很多,这里分析的是严重依赖Cache的系统服务过载。首先来看一种包含Cache的体系结构(如下图所示)。
qq 20160826092435

A系统依赖B系统的读服务,A系统是60台机器组成的集群,B系统是6台机器组成的集群,之所以6台机器能够扛住60台机器的访问,是因为A系统并不是每次都访问B,而是首先请求Cache,只有Cache的相应数据失效时才会请求B。

这正是Cache存在的意义,它让B系统节省了大量机器;如果没有Cache,B系统不得不组成60台机器的集群,如果A也同时依赖除B系统外的另一个系统(假设为C系统)呢?那么C系统也要60台机器,放大的流量将很快耗尽公司的资源。

然而Cache的引入也不是十全十美的,这个结构中如果Cache发生问题,全部的流量将流向依赖方,造成流量激增,从而引发依赖系统的过载。

回到A和B的架构,造成服务过载的原因至少有下面三种:

B系统的前置代理发生故障或者其他原因造成B系统暂时不可用,等B系统系统服务恢复时,其流量将远远超过正常值。
Cache系统故障,A系统的流量将全部流到B系统,造成B系统过载。
Cache故障恢复,但这时Cache为空,Cache瞬间命中率为0,相当于Cache被击穿,造成B系统过载。
第一个原因不太好理解,为什么B系统恢复后流量会猛增呢?主要原因就是缓存的超时时间。当有数据超时的时候,A系统会访问B系统,但是这时候B系统偏偏故障不可用,那么这个数据只好超时,等发现B系统恢复时,发现缓存里的B系统数据已经都超时了,都成了旧数据,这时当然所有的请求就打到了B。

下文主要介绍服务过载的预防和发生后的一些补救方法,以预防为主,从调用方和服务方的视角阐述一些可行方案。

服务过载的预防
所谓Client端指的就是上文结构中的A系统,相对于B系统,A系统就是B系统的Client,B系统相当于Server。

Client端的方案
针对上文阐述的造成服务过载的三个原因:B系统故障恢复、Cache故障、Cache故障恢复,我们看看A系统有哪些方案可以应对。

合理使用Cache应对B系统宕机
一般情况下,Cache的每个Key除了对应Value,还对应一个过期时间T,在T内,get操作直接在Cache中拿到Key对应Value并返回。但是在T到达时,get操作主要有五种模式:

  1. 基于超时的简单(stupid)模式

在T到达后,任何线程get操作发现Cache中的Key和对应Value将被清除或标记为不可用,get操作将发起调用远程服务获取Key对应的Value,并更新写回Cache,然后get操作返回新值;如果远程获取Key-Value失败,则get抛出异常。

为了便于理解,举一个码头工人取货的例子:5个工人(线程)去港口取同样Key的货(get),发现货已经过期被扔掉了,这时5个工人各自分别去对岸取新货,然后返回。

  1. 基于超时的常规模式

在T到达后,Cache中的Key和对应Value将被清除或标记为不可用,get操作将调用远程服务获取Key对应的Value,并更新写回Cache;此时,如果另一个线程发现Key和Value已经不可用,get操作还需要判断有没有其他线程发起了远程调用,如果有,那么自己就等待,直到那个线程远程获取操作成功,Cache中得Key变得可用,get操作返回新的Value。如果远程获取操作失败,则get操作抛出异常,不会返回任何Value。

还是码头工人的例子:5个工人(线程)去港口取同样Key的货(get),发现货已经过期被扔掉了,那么只需派出一个人去对岸取货,其他四个人在港口等待即可,而不用5个人全去。

基于超时的简单模式和常规模式区别在于对于同一个超时的Key,前者每个get线程一旦发现Key不存在,则发起远程调用获取值;而后者每个get线程发现Key不存在,则还要判断当前是否有其他线程已经发起了远程调用操作获取新值,如果有,自己就简单的等待即可。

显然基于超时的常规模式比基于超时的简单模式更加优化,减少了超时时并发访问后端的调用量。

实现基于超时的常规模式就需要用到经典的Double-checked locking惯用法了。

  1. 基于刷新的简单(stupid)模式

在T到达后,Cache中的Key和相应Value不动,但是如果有线程调用get操作,将触发refresh操作,根据get和refresh的同步关系,又分为两种模式:

同步模式:任何线程发现Key过期,都触发一次refresh操作,get操作等待refresh操作结束,refresh结束后,get操作返回当前Cache中Key对应的Value。注意refresh操作结束并不意味着refresh成功,还可能抛了异常,没有更新Cache,但是get操作不管,get操作返回的值可能是旧值。
异步模式:任何线程发现Key过期,都触发一次refresh操作,get操作触发refresh操作,不等refresh完成,直接返回Cache中的旧值。
举上面码头工人的例子说明基于刷新的常规模式:这次还是5工人去港口取货,这时货都在,但是已经旧了,这时5个工人有两种选择:

5个人各自去远程取新货,如果取货失败,则拿着旧货返回(同步模式)
5个人各自通知5个雇佣工去取新货,5个工人拿着旧货先回(异步模式)
4. 基于刷新的常规模式

在T到达后,Cache中的Key和相应Value都不会被清除,而是被标记为旧数据,如果有线程调用get操作,将触发refresh更新操作,根据get和refresh的同步关系,又分为两种模式:

同步模式:get操作等待refresh操作结束,refresh结束后,get操作返回当前Cache中Key对应的Value,注意:refresh操作结束并不意味着refresh成功,还可能抛了异常,没有更新Cache,但是get操作不管,get操作返回的值可能是旧值。如果其他线程进行get操作,Key已经过期,并且发现有线程触发了refresh操作,则自己不等refresh完成直接返回旧值。
异步模式:get操作触发refresh操作,不等refresh完成,直接返回Cache中的旧值。如果其他线程进行get操作,发现Key已经过期,并且发现有线程触发了refresh操作,则自己不等refresh完成直接返回旧值。
再举上面码头工人的例子说明基于刷新的常规模式:这次还是5工人去港口取货,这时货都在,但是已经旧了,这时5个工人有两种选择:

派一个人去远方港口取新货,其余4个人拿着旧货先回(同步模式)。
5个人通知一个雇佣工去远方取新货,5个人都拿着旧货先回(异步模式)。
基于刷新的简单模式和基于刷新的常规模式区别就在于取数线程之间能否感知当前数据是否正处在刷新状态,因为基于刷新的简单模式中取数线程无法感知当前过期数据是否正处在刷新状态,所以每个取数线程都会触发一个刷新操作,造成一定的线程资源浪费。

而基于超时的常规模式和基于刷新的常规模式区别在于前者过期数据将不能对外访问,所以一旦数据过期,各线程要么拿到数据,要么抛出异常;后者过期数据可以对外访问,所以一旦数据过期,各线程要么拿到新数据,要么拿到旧数据。

  1. 基于刷新的续费模式

该模式和基于刷新的常规模式唯一的区别在于refresh操作超时或失败的处理上。在基于刷新的常规模式中,refresh操作超时或失败时抛出异常,Cache中的相应Key-Value还是旧值,这样下一个get操作到来时又会触发一次refresh操作。

在基于刷新的续费模式中,如果refresh操作失败,那么refresh将把旧值当成新值返回,这样就相当于旧值又被续费了T时间,后续T时间内get操作将取到这个续费的旧值而不会触发refresh操作。

基于刷新的续费模式也像常规模式那样分为同步模式和异步模式,不再赘述。

下面讨论这5种Cache get模式在服务过载发生时的表现,首先假设如下:

假设A系统的访问量为每分钟M次。
假设Cache能存Key为C个,并且Key空间有N个。
假设正常状态下,B系统访问量为每分钟W次,显然W<<M。
这时因为某种原因,比如B长时间故障,造成Cache中得Key全部过期,B系统这时从故障中恢复,五种get模式分析表现分析如下:

在基于超时和刷新的简单模式中,B系统的瞬间流量将达到和A的瞬时流量M大体等同,相当于Cache被击穿。这就发生了服务过载,这时刚刚恢复的B系统将肯定会被大流量压垮。
在基于超时和刷新的常规模式中,B系统的瞬间流量将和Cache中Key空间N大体等同。这时是否发生服务过载,就要看Key空间N是否超过B系统的流量上限了。
在基于刷新的续费模式中,B系统的瞬间流量为W,和正常情况相同而不会发生服务过载。实际上,在基于刷新的续费模式中,不存在Cache Key全部过期的情况,就算把B系统永久性地干掉,A系统的Cache也会基于旧值长久的平稳运行。
第3点,B系统不会发生服务过载的主要原因是基于刷新的续费模式下不会出现chache中的Key全部长时间过期的情况,即使B系统长时间不可用,基于刷新的续费模式也会在一个过期周期内把旧值当成新值继续使用。所以当B系统恢复时,A系统的Cache都处在正常工作状态。

从B系统的角度看,能够抵抗服务过载的基于刷新的续费模式最优。

从A系统的角度看,由于一般情况下A系统是一个高访问量的在线web应用,这种应用最讨厌的一个词就是“线程等待”,因此基于刷新的各种异步模式较优。

综合考虑,基于刷新的异步续费模式是首选。

然而凡是有利就有弊,有两点需要注意的地方:

基于刷新模式最大的缺点是Key-Value一旦放入Cache就不会被清除,每次更新也是新值覆盖旧值,JVM GC永远无法对其进行垃圾收集,而基于超时的模式中,Key-Value超时后如果新的访问没有到来,内存是可以被GC垃圾回收的。所以如果你使用的是寸土寸金的本地内存做Cache就要小心了。
基于刷新的续费模式需要做好监控,不然有可能Cache中的值已经和真实的值相差很远了,应用还以为是新值而使用。
关于具体的Cache,来自Google的Guava本地缓存库支持上文的第二种、第四种和第五种get操作模式。

但是对于Redis等分布式缓存,只提供原始的get、set方法,而提供的get仅仅是获取,与上文提到的五种get操作模式不是一个概念。开发者想用这五种get操作模式的话不得不自己封装和实现。

五种get操作模式中,基于超时和刷新的简单模式是实现起来最简单的模式,但遗憾的是这两种模式对服务过载完全无免疫力,这可能也是服务过载在大量依赖缓存的系统中频繁发生的一个重要原因吧。

本文之所以把第1、3种模式称为stupid模式,是想强调这种模式应该尽量避免,Guava里面根本没有这种模式,而Redis只提供简单的读写操作,很容易就把系统实现成了这种方式。

应对分布式Cache宕机
如果是Cache直接挂了,那么就算是基于刷新的异步续费模式也无能为力了。这时A系统铁定无法对Cache进行存取操作,只能将流量完全打到B系统,B系统面对服务过载在劫难逃......

本节讨论的预防Cache宕机仅限于分布式Cache,因为本地Cache一般和A系统应用共享内存和进程,本地Cache挂了A系统也挂了,不会出现本地Cache挂了而A系统应用正常的情况。

首先,A系统请求线程检查分布式Cache状态,如果无应答则说明分布式Cache挂了,则转向请求B系统,这样一来大流量将压垮B系统。这时可选的方案如下:

A系统的当前线程不请求B系统,而是打个日志并设置一个默认值。
A系统的当前线程按照一定概率决定是否请求B系统。
A系统的当前线程检查B系统运行情况,如果良好则请求B系统。
方案1最简单,A系统知道如果没有Cache,B系统可能扛不住自己的全部流量,索性不请求B系统,等待Cache恢复。但这时B系统利用率为0,显然不是最优方案,而且当请求的Value不容易设置默认值时,这个方案就不行了。

方案2可以让一部分线程请求B系统,这部分请求肯定能被B系统hold住。可以保守的设置这个概率 u =(B系统的平均流量)/(A系统的峰值流量)

方案3是一种更为智能的方案,如果B系统运行良好,当前线程请求;如果B系统过载,则不请求,这样A系统将让B系统处于一种宕机与不宕机的临界状态,最大限度挖掘B系统性能。这种方案要求B系统提供一个性能评估接口返回Yes和No,Yes表示B系统良好,可以请求;No表示B系统情况不妙,不要请求。这个接口将被频繁调用,必须高效。

方案3的关键在于如何评估一个系统的运行状况。一个系统中当前主机的性能参数有CPU负载、内存使用率、Swap使用率、GC频率和GC时间、各个接口平均响应时间等,性能评估接口需要根据这些参数返回Yes或者No,是不是机器学习里的二分类问题?😄关于这个问题已经可以单独写篇文章讨论了,在这里就不展开了,你可以想一个比较简单傻瓜的保守策略,缺点是A系统的请求无法很好的逼近B系统的性能极限。

综合以上分析,方案2比较靠谱。如果选择方案3,建议由专门团队负责研究并提供统一的系统性能实时评估方案和工具。

应对分布式Cache宕机后的恢复
不要以为成功hold住分布式Cache宕机就万事大吉了,真正的考验是分布式Cache从宕机过程恢复之后,这时分布式Cache中什么都没有。

即使是上文中提到了基于刷新的异步续费策略这时也没用,因为分布式Cache为空,无论如何都要请求B系统。这时B系统的最大流量是Key的空间取值数量。

如果Key的取值空间数量很少,则相安无事;如果Key的取值空间数量大于B系统的流量上限,服务过载依然在所难免。

这种情况A系统很难处理,关键原因是A系统请求Cache返回Key对应Value为空,A系统无法知道是因为当前Cache是刚刚初始化,所有内容都为空;还是因为仅仅是自己请求的那个Key没在Cache里。

如果是前者,那么当前线程就要像处理Cache宕机那样进行某种策略的回避;如果是后者,直接请求B系统即可,因为这是正常的Cache使用流程。

对于Cache宕机的恢复,A系统真的无能为力,只能寄希望于B系统的方案了。

Server端的方案
相对于Client端需要应对各种复杂问题,Server端需要应对的问题非常简单,就是如何从容应对过载的问题。无论是缓存击穿也好,还是拒绝服务攻击也罢,对于Server端来说都是过载保护的问题。对于过载保护,主要给出两种可行方案,以及一种比较复杂的方案思路。

流量控制
流量控制就是B系统实时监控当前流量,如果超过预设的值或者系统承受能力,则直接拒绝掉一部分请求,以实现对系统的保护。

流量控制根据基于的数据不同,可分为两种:

基于流量阈值的流控:流量阈值是每个主机的流量上限,流量超过该阈值主机将进入不稳定状态。阈值提前进行设定,如果主机当前流量超过阈值,则拒绝掉一部分流量,使得实际被处理流量始终低于阈值。
基于主机状态的流控:每个接受每个请求之前先判断当前主机状态,如果主机状况不佳,则拒绝当前请求。
基于阈值的流控实现简单,但是最大的问题是需要提前设置阈值,而且随着业务逻辑越来越复杂,接口越来越多,主机的服务能力实际应该是下降的,这样就需要不断下调阈值,增加了维护成本,而且万一忘记调整的话,呵呵……

主机的阈值可以通过压力测试确定,选择的时候可以保守些。

基于主机状态的流控免去了人为控制,但是其最大的确定上文已经提到:如何根据当前主机各个参数判断主机状态呢?想要完美的回答这个问题目测并不容易,因此在没有太好答案之前,我推荐基于阈值的流控。

流量控制基于实现位置的不同,又可以分为两种:

反向代理实现流控:在反向代理如Nginx上基于各种策略进行流量控制。这种一般针对HTTP服务。
借助服务治理系统:如果Server端是RMI、RPC等服务,可以构建专门的服务治理系统进行负载均衡、流控等服务。
服务容器实现流控:在应用代码里,业务逻辑之前实现流量控制。
第3种在服务器的容器(如Java容器)中实现流控并不推荐,因为流控和业务代码混在一起容易混乱;其次实际上流量已经全量进入到了业务代码里,这时的流控只是阻止其进入真正的业务逻辑,所以流控效果将打折;还有,如果流量策略经常变动,系统将不得不为此经常更改。

因此,推荐前两种方式。

最后提一个注意点:当因为流控而拒绝请求时,务必在返回的数据中带上相关信息(比如“当前请求因为超出流量而被禁止访问”),如果返回值什么都没有将是一个大坑。因为造成调用方请求没有被响应的原因很多,可能是调用方Bug,也可能是服务方Bug,还可能是网络不稳定,这样一来很可能在排查一整天后发现是流控搞的鬼......

服务降级
服务降级一般由人为触发,属于服务过载造成崩溃恢复时的策略,但为了和流控对比,将其放到这里。

流量控制本质上是减小访问量,而服务处理能力不变;而服务降级本质上是降低了部分服务的处理能力,增强另一部分服务处理能力,而访问量不变。

服务降级是指在服务过载时关闭不重要的接口(直接拒绝处理请求),而保留重要的接口。比如服务由10个接口,服务降级时关闭了其中五个,保留五个,这时这个主机的服务处理能力将增强到二倍左右。

然而,服务过载发生时动辄就超出系统处理能力10倍,而服务降级能使主机服务处理能力提高10倍么?显然很困难,因此服务过载的应对不能只依靠服务降级策略。

动态扩展
动态扩展指的是在流量超过系统服务能力时,自动触发集群扩容,自动部署并上线运行;当流量过去后又自动回收多余机器,完全弹性。

这个方案是不是感觉很不错。但是目前互联网公司的在线应用跑在云上的本身就不多,要完全实现在线应用的自动化弹性运维,要走的路就更多了。

崩溃恢复
如果服务过载造成系统崩溃还是不幸发生了,这时需要运维控制流量,等后台系统启动完毕后循序渐进的放开流量,主要目的是让Cache慢慢预热。流量控制刚开始可以为10%,然后20%,然后50%,然后80%,最后全量,当然具体的比例,尤其是初始比例,还要看后端承受能力和前端流量的比例,各个系统并不相同。

如果后端系统有专门的工具进行Cache预热,则省去了运维的工作,等Cache热起来再发布后台系统即可。但是如果Cache中的Key空间很大,开发预热工具将比较困难。

结论
“防患于未然”放在服务过载的应对上也是适合的,预防为主,补救为辅。综合上文分析,具体的预防要点如下:

调用方(A系统)采用基于刷新的异步续费模式使用Cache,或者至少不能使用基于超时或刷新的简单(stupid)模式。
调用方(A系统)每次请求Cache时检查Cache是否可用(available),如果不可用则按照一个保守的概率访问后端,而不是无所顾忌的直接访问后端。
服务方(B系统)在反向代理处设置流量控制进行过载保护,阈值需要通过压测获得。
崩溃的补救主要还是靠运维和研发在发生时的通力合作:观察流量变化准确定位崩溃原因,运维控流量研发持续关注性能变化。

未来如果有条件的话可以研究下主机应用健康判断问题和动态弹性运维问题,毕竟自动化比人为操作要靠谱。

ngnix+fastdfs

FastDFS源代码:FastDFS_v3.11.tar.gz
nginx模块源代码:fastdfs-nginx-module_v1.09.tar.gz
nginx服务器源代码:nginx-1.10.1.tar.gz
nginx依赖的pcre库源代码:pcre-8.38.tar.gz

cp FastDFS_v3.11.tar.gz /usr/local/fdfs/
cd /usr/local/fdfs/
tar xvf FastDFS_v3.11.tar.gz 
cd FastDFS
./make.sh install
 ll /usr/local/bin/fdfs*
 iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport 22122 -j ACCEPT
/etc/init.d/iptables save
mkdir -p /fdfs/storage
mkdir -p /fdfs/tracker

vim /etc/fdfs/tracker.conf

# is this config file disabled
# false for enabled
# true for disabled
disabled=false

# bind an address of this host
# empty for bind all addresses of this host
bind_addr=

# the tracker server port
port=22122

# connect timeout in seconds
# default value is 30s
connect_timeout=30

# network timeout in seconds
# default value is 30s
network_timeout=60

# the base path to store data and log files
base_path=/fdfs/tracker

# max concurrent connections this server supported
max_connections=256

# work thread count, should <= max_connections
# default value is 4
# since V2.00
work_threads=4

# the method of selecting group to upload files
#0: round robin
#1: specify group
#2: load balance, select the max free space group to upload file
store_lookup=2

# which group to upload file
# when store_lookup set to 1, must set store_group to the group name
store_group=group2

# which storage server to upload file
#0: round robin (default)
#1: the first server order by ip address
#2: the first server order by priority (the minimal)
store_server=0

# which path(means disk or mount point) of the storage server to upload file
#0: round robin
#2: load balance, select the max free space path to upload file
store_path=0

# which storage server to download file
#0: round robin (default)
#1: the source storage server which the current file uploaded to
download_server=0

# reserved storage space for system or other applications.
# if the free(available) space of any stoarge server in 
# a group <= reserved_storage_space, 
# no file can be uploaded to this group.
# bytes unit can be one of follows:
### G or g for gigabyte(GB)
### M or m for megabyte(MB)
### K or k for kilobyte(KB)
### no unit for byte(B)
reserved_storage_space = 4GB

#standard log level as syslog, case insensitive, value list:
### emerg for emergency
### alert
### crit for critical
### error
### warn for warning
### notice
### info
### debug
log_level=info

#unix group name to run this program, 
#not set (empty) means run by the group of current user
run_by_group=

#unix username to run this program,
#not set (empty) means run by current user
run_by_user=

# allow_hosts can ocur more than once, host can be hostname or ip address,
# "*" means match all ip addresses, can use range like this: 10.0.1.[1-15,20] or
# host[01-08,20-25].domain.com, for example:
# allow_hosts=10.0.1.[1-15,20]
# allow_hosts=host[01-08,20-25].domain.com
allow_hosts=*

# sync log buff to disk every interval seconds
# default value is 10 seconds
sync_log_buff_interval = 10

# check storage server alive interval seconds
check_active_interval = 120

# thread stack size, should >= 64KB
# default value is 64KB
thread_stack_size = 64KB

# auto adjust when the ip address of the storage server changed
# default value is true
storage_ip_changed_auto_adjust = true

# storage sync file max delay seconds
# default value is 86400 seconds (one day)
# since V2.00
storage_sync_file_max_delay = 86400

# the max time of storage sync a file
# default value is 300 seconds
# since V2.00
storage_sync_file_max_time = 300

# if use a trunk file to store several small files
# default value is false
# since V3.00
use_trunk_file = false 

# the min slot size, should <= 4KB
# default value is 256 bytes
# since V3.00
slot_min_size = 256

# the max slot size, should > slot_min_size
# store the upload file to trunk file when it's size <=  this value
# default value is 16MB
# since V3.00
slot_max_size = 16MB

# the trunk file size, should >= 4MB
# default value is 64MB
# since V3.00
trunk_file_size = 64MB

# if create trunk file advancely
# default value is false
# since V3.06
trunk_create_file_advance = false

# the time base to create trunk file
# the time format: HH:MM
# default value is 02:00
# since V3.06
trunk_create_file_time_base = 02:00

# the interval of create trunk file, unit: second
# default value is 38400 (one day)
# since V3.06
trunk_create_file_interval = 86400

# the threshold to create trunk file
# when the free trunk file size less than the threshold, will create 
# the trunk files
# default value is 0
# since V3.06
trunk_create_file_space_threshold = 20G

# if check trunk space occupying when loading trunk free spaces
# the occupied spaces will be ignored
# default value is false
# since V3.09
# NOTICE: set this parameter to true will slow the loading of trunk spaces 
# when startup. you should set this parameter to true when neccessary.
trunk_init_check_occupying = false

# if ignore storage_trunk.dat, reload from trunk binlog
# default value is false
# since V3.10
# set to true once for version upgrade when your version less than V3.10
trunk_init_reload_from_binlog = false

# HTTP settings
http.disabled=false

# HTTP port on this tracker server
http.server_port=8080

# check storage HTTP server alive interval seconds
# <= 0 for never check
# default value is 30
http.check_alive_interval=30

# check storage HTTP server alive type, values are:
#   tcp : connect to the storge server with HTTP port only, 
#        do not request and get response
#   http: storage check alive url must return http status 200
# default value is tcp
http.check_alive_type=tcp

# check storage HTTP server alive uri/url
# NOTE: storage embed HTTP server support uri: /status.html
http.check_alive_uri=/status.html

# if need find content type from file extension name
http.need_find_content_type=true

#use "#include" directive to include http other settings
##include http.conf

/usr/local/bin/restart.sh /usr/local/bin/fdfs_trackerd /etc/fdfs/tracker.conf

vim /etc/fdfs/storage.conf

# is this config file disabled
# false for enabled
# true for disabled
disabled=false

# the name of the group this storage server belongs to
group_name=group1

# bind an address of this host
# empty for bind all addresses of this host
bind_addr=

# if bind an address of this host when connect to other servers 
# (this storage server as a client)
# true for binding the address configed by above parameter: "bind_addr"
# false for binding any address of this host
client_bind=true

# the storage server port
port=23000

# connect timeout in seconds
# default value is 30s
connect_timeout=30

# network timeout in seconds
# default value is 30s
network_timeout=60

# heart beat interval in seconds
heart_beat_interval=30

# disk usage report interval in seconds
stat_report_interval=60

# the base path to store data and log files
base_path=/fdfs/storage

# max concurrent connections the server supported
# default value is 256
# more max_connections means more memory will be used
max_connections=256

# the buff size to recv / send data
# this parameter must more than 8KB
# default value is 64KB
# since V2.00
buff_size = 256KB

# work thread count, should <= max_connections
# work thread deal network io
# default value is 4
# since V2.00
work_threads=4

# if disk read / write separated
##  false for mixed read and write
##  true for separated read and write
# default value is true
# since V2.00
disk_rw_separated = true

# disk reader thread count per store base path
# for mixed read / write, this parameter can be 0
# default value is 1
# since V2.00
disk_reader_threads = 1

# disk writer thread count per store base path
# for mixed read / write, this parameter can be 0
# default value is 1
# since V2.00
disk_writer_threads = 1

# when no entry to sync, try read binlog again after X milliseconds
# must > 0, default value is 200ms
sync_wait_msec=50

# after sync a file, usleep milliseconds
#0 for sync successively (never call usleep)
sync_interval=0

# storage sync start time of a day, time format: Hour:Minute
# Hour from 0 to 23, Minute from 0 to 59
sync_start_time=00:00

# storage sync end time of a day, time format: Hour:Minute
# Hour from 0 to 23, Minute from 0 to 59
sync_end_time=23:59

# write to the mark file after sync N files
# default value is 500
write_mark_file_freq=500

# path(disk or mount point) count, default value is 1
store_path_count=1

# store_path#, based 0, if store_path0 not exists, it's value is base_path
# the paths must be exist
store_path0=/fdfs/storage
#store_path1=/home/yuqing/fastdfs2

# subdir_count  * subdir_count directories will be auto created under each 
# store_path (disk), value can be 1 to 256, default value is 256
subdir_count_per_path=256

# tracker_server can ocur more than once, and tracker_server format is
#  "host:port", host can be hostname or ip address
tracker_server=192.168.216.148:22122

#standard log level as syslog, case insensitive, value list:
### emerg for emergency
### alert
### crit for critical
### error
### warn for warning
### notice
### info
### debug
log_level=info

#unix group name to run this program, 
#not set (empty) means run by the group of current user
run_by_group=

#unix username to run this program,
#not set (empty) means run by current user
run_by_user=

# allow_hosts can ocur more than once, host can be hostname or ip address,
# "*" means match all ip addresses, can use range like this: 10.0.1.[1-15,20] or
# host[01-08,20-25].domain.com, for example:
# allow_hosts=10.0.1.[1-15,20]
# allow_hosts=host[01-08,20-25].domain.com
allow_hosts=*

# the mode of the files distributed to the data path
#0: round robin(default)
#1: random, distributted by hash code
file_distribute_path_mode=0

# valid when file_distribute_to_path is set to 0 (round robin), 
# when the written file count reaches this number, then rotate to next path
# default value is 100
file_distribute_rotate_count=100

# call fsync to disk when write big file
#0: never call fsync
# other: call fsync when written bytes >= this bytes
# default value is 0 (never call fsync)
fsync_after_written_bytes=0

# sync log buff to disk every interval seconds
# must > 0, default value is 10 seconds
sync_log_buff_interval=10

# sync binlog buff / cache to disk every interval seconds
# default value is 60 seconds
sync_binlog_buff_interval=10

# sync storage stat info to disk every interval seconds
# default value is 300 seconds
sync_stat_file_interval=300

# thread stack size, should >= 512KB
# default value is 512KB
thread_stack_size=512KB

# the priority as a source server for uploading file.
# the lower this value, the higher its uploading priority.
# default value is 10
upload_priority=10

# the NIC alias prefix, such as eth in Linux, you can see it by ifconfig -a
# multi aliases split by comma. empty value means auto set by OS type
# default values is empty
if_alias_prefix=

# if check file duplicate, when set to true, use FastDHT to store file indexes
#1 or yes: need check
#0 or no: do not check
# default value is 0
check_file_duplicate=0

# namespace for storing file indexes (key-value pairs)
# this item must be set when check_file_duplicate is true / on
key_namespace=FastDFS

# set keep_alive to 1 to enable persistent connection with FastDHT servers
# default value is 0 (short connection)
keep_alive=0

# you can use "#include filename" (not include double quotes) directive to 
# load FastDHT server list, when the filename is a relative path such as 
# pure filename, the base path is the base path of current/this config file.
# must set FastDHT server list when check_file_duplicate is true / on
# please see INSTALL of FastDHT for detail
##include /home/yuqing/fastdht/conf/fdht_servers.conf


#HTTP settings
http.disabled=false

# use the ip address of this storage server if domain_name is empty,
# else this domain name will ocur in the url redirected by the tracker server
http.domain_name=

# the port of the web server on this storage server
http.server_port=8888

http.trunk_size=256KB

# if need find content type from file extension name
http.need_find_content_type=true

#use "#include" directive to include HTTP other settings
##include http.conf


/usr/local/bin/restart.sh /usr/local/bin/fdfs_storaged /etc/fdfs/storage.conf
/usr/local/bin/restart.sh /usr/local/bin/fdfs_monitor /etc/fdfs/storage.conf 

vim /etc/fdfs/mod_fastdfs.conf

# connect timeout in seconds
# default value is 30s
connect_timeout=2

# network timeout in seconds
# default value is 30s
network_timeout=30

# the base path to store log files
base_path=/fdfs/tracker

# FastDFS tracker_server can ocur more than once, and tracker_server format is
#  "host:port", host can be hostname or ip address
tracker_server=192.168.216.148:22122

# the port of storage server
# the default value is 23000
storage_server_port=23000

# the group name of storage server
group_name=group1

# if uri including group name
# default value is false
url_have_group_name = true

# path(disk or mount point) count, default value is 1
store_path_count=1

# store_path#, based 0, if store_path0 not exists, it's value is base_path
# the paths must be exist
store_path0=/fdfs/storage
#store_path1=/home/yuqing/fastdfs1

#standard log level as syslog, case insensitive, value list:
### emerg for emergency
### alert
### crit for critical
### error
### warn for warning
### notice
### info
### debug
log_level=debug

# set the log filename, such as /usr/local/apache2/logs/mod_fastdfs.log
# empty for output to stderr (apache error_log file)
log_filename=

# response mode when the file not exist in local
## proxy: get the content from other storage server, then send to client
## redirect: redirect to other storage server (HTTP Header is Location)
response_mode=redirect

# the NIC alias prefix, such as eth in Linux, you can see it by ifconfig -a
# multi aliases split by comma. empty value means auto set by OS type
# this paramter used to get all ip address of the local host
# default values is empty
if_alias_prefix=

# if need find content type from file extension name
# should set to false because it done by apache
http.need_find_content_type=false

#use "#include" directive to include HTTP config file
#include http.conf

 cp nginx-1.10.1.tar.gz /usr/local/src
cp fastdfs-nginx-module_v1.09.tar.gz /usr/local/src
 cp pcre-8.38.tar.gz /usr/local/src
 cd /usr/local/src/
tar zxf nginx-1.10.1.tar.gz
 tar zxf fastdfs-nginx-module_v1.09.tar.gz
 tar zxf pcre-8.38.tar.gz
cd nginx-1.10.1

vim /usr/local/ngnix-1.10.1/conf

user  root;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

    server {
        listen       8080;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location / {
            root   html;
            index  index.html index.htm;
        }
        location /group1/M00 {
            root /fdfs/storage/data;
            ngx_fastdfs_module;
        } 
        location /group2/M00 {
            root /fdfs/storage/data;
            ngx_fastdfs_module;
        }

        #error_page  404              /404.html;

        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

        # proxy the PHP scripts to Apache listening on 127.0.0.1:80
        #
        #location ~ \.php$ {
        #    proxy_pass   http://127.0.0.1;
        #}

        # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
        #
        #location ~ \.php$ {
        #    root           html;
        #    fastcgi_pass   127.0.0.1:9000;
        #    fastcgi_index  index.php;
        #    fastcgi_param  SCRIPT_FILENAME  /scripts$fastcgi_script_name;
        #    include        fastcgi_params;
        #}

        # deny access to .htaccess files, if Apache's document root
        # concurs with nginx's one
        #
        #location ~ /\.ht {
        #    deny  all;
        #}
    }


    # another virtual host using mix of IP-, name-, and port-based configuration
    #
    #server {
    #    listen       8000;
    #    listen       somename:8080;
    #    server_name  somename  alias  another.alias;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}


    # HTTPS server
    #
    #server {
    #    listen       443 ssl;
    #    server_name  localhost;

    #    ssl_certificate      cert.pem;
    #    ssl_certificate_key  cert.key;

    #    ssl_session_cache    shared:SSL:1m;
    #    ssl_session_timeout  5m;

    #    ssl_ciphers  HIGH:!aNULL:!MD5;
    #    ssl_prefer_server_ciphers  on;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}
}
./configure --prefix=/usr/local/ngnix-1.10.1 --with-pcre  --with-http_stub_status_module --with-http_ssl_module --with-http_gzip_static_module --with-http_realip_module --add-module=/usr/local/src/fastdfs-nginx-module/src
make && make install
/usr/local/ngnix-1.10.1 -s reload

mongodb 安装部署

windows

> cd D:\MongoDB\bin
> mongod.exe --dbpath D:\mongodb\data --logpath=D:\mongodb\logs\mongodb.log 

https://fastdl.mongodb.org/linux/mongodb-linux-i686-3.2.5.tgz

存储Mongodb安装
步骤一:解压mongodb-linux-x86_64-2.4.8.tgz压缩包
tar -zxvf mongodb-linux-x86_64-3.0.4.tgz
步骤二:创建数据目录和日志目录

          mkdir -p /mongo/data
                 Mkdir -p /mongo/log

步骤三:启动:进入mongodb解压目录(默认端口27017)
./bin/mongod -port 27017 --dbpath=/data/mongodb_data/ --logpath=//data/mongodb_log/server.log --fork

步骤四:验证是否启动成功
[root@localhost mongodb-linux-i686-3.0.4]# ./bin/mongo
MongoDB shell version: 3.0.4
connecting to: test
Welcome to the MongoDB shell.
For interactive help, type "help".
For more comprehensive documentation, see
http://docs.mongodb.org/
Questions? Try the support group
http://groups.google.com/group/mongodb-user
Server has startup warnings:
2015-07-07T15:35:16.124+0800 I CONTROL [initandlisten] ** WARNING: You are running this process as the root user, which is not recommended.
2015-07-07T15:35:16.125+0800 I CONTROL [initandlisten]
2015-07-07T15:35:16.125+0800 I CONTROL [initandlisten]
2015-07-07T15:35:16.125+0800 I CONTROL [initandlisten] ** NOTE: This is a 32 bit MongoDB binary.
2015-07-07T15:35:16.125+0800 I CONTROL [initandlisten] ** 32 bit builds are limited to less than 2GB of data (or less with --journal).
2015-07-07T15:35:16.125+0800 I CONTROL [initandlisten] ** Note that journaling defaults to off for 32 bit and is currently off.
2015-07-07T15:35:16.125+0800 I CONTROL [initandlisten] ** See http://dochub.mongodb.org/core/32bit
2015-07-07T15:35:16.125+0800 I CONTROL [initandlisten]

到安装目录bin下执行./mongo

  show dbs                     show database names   
  show collections             show collections in current database   
  show users                   show users in current database  
use msgs;//使用数据库

 db.msgsPush.find();//查询
 db.msgsPush.remove({});//删除所有数据

db.userInfo.find({"key":"18358144189"});
db.userInfo.find({"key":23})

java 异常集锦

问题:

Parameter metadata not available for the given statement

解决方法:
jdbcurl后面加上&generateSimpleParameterMetadata=true

问题:

windows下rabbitmq 遇到Error: unable to connect to node rabbit@WWW: nodedown

解决方法:
权限问题,装在非系统盘

问题:

maven依赖丢失和 Deployment Assembly 丢失

解决方法:
image

问题:

java.lang.NoClassDefFoundError: javax/servlet/jsp/jstl/core/Config

解决方法:

<dependency>
    <groupId>jstl</groupId>
    <artifactId>jstl</artifactId>
    <version>1.2</version>
</dependency>

问题:

@PathVariable中文乱码

解决方法:
1.String des = new String(s.getBytes("iso8859-1"),"UTF-8");
2.tomcat的conf/server.xml

<Connector connectionTimeout="20000" port="8080" protocol="HTTP/1.1" redirectPort="8443" URIEncoding="UTF-8"/>

问题:

Eclipse报

-Dmaven.multiModuleProjectDirectory system propery is not set. Check $M2_HOME environment variable and mvn script match.
解决方法:
可以设一个环境变量M2_HOME指向你的maven安装目录
M2_HOME=D:\Apps\apache-maven-3.3.1
然后在Window->Preference->Java->Installed JREs->Edit
在Default VM arguments中设置
-Dmaven.multiModuleProjectDirectory=$M2_HOME

linux问题:

-bash: test.asc: Permission denied

解决方法:
$ sudo sh -c 'echo "又一行信息" >> test.asc'

$ echo "第三条信息" | sudo tee -a test.asc

问题:emoji存储异常
解决方法:
#34

问题:# mybatis使用useGeneratedKeys="true" keyProperty="id"无法取得主键
分析:
如果在插入后马上取id,取出来是空,如果在事务外或者采用

<selectKey resultType="java.lang.Long" order="AFTER" keyProperty="id" >
       SELECT LAST_INSERT_ID()
    </selectKey>

是可以取得id的
解决方法:
对mybatis配置设置
<setting name="defaultExecutorType" value="REUSE" />

下面是附上的解释:
image

问题:

HttpClient 超过1M报 Going to buffer response body of large or unknown size. Using getResponseBodyAsStream instead is recommended.

答案:

InputStream inputStream = method.getResponseBodyAsStream();
//解决乱码问题
                BufferedReader br = new BufferedReader(new InputStreamReader(inputStream,"UTF-8"));
                StringBuffer stringBuffer = new StringBuffer();
                String str= "";
                while((str = br.readLine()) != null){
                stringBuffer .append(str );
                }

                stringBuffer.toString();

问题:

springboot 1.4.0 org.springframework.beans.factory.BeanCreationException: Error creating bean wit

h name 'entityManagerFactory' defined in class path resource
Caused by: java.io.FileNotFoundException: class path resource [] cannot be resolved to URL because it does not exist
at org.springframework.core.io.ClassPathResource.getURL(ClassPathResource.java:187) ~[spring-core-4.3.1.BUILD-SNAPSHOT.jar!/:4.3.1.BUILD-SNAPSHOT]
at org.springframework.orm.jpa.persistenceunit.DefaultPersistenceUnitManager.determineDefaultPersistenceUnitRootUrl(DefaultPersistenceUnitManager.java:600) ~[spring-orm-4.3.1.BUILD-SNAPSHOT.jar!/:4.3.1.BUILD-SNAPSHOT]
... 31 common frames omitted

解决方法
@EnableAutoConfiguration(exclude=HibernateJpaAutoConfiguration.class) must be set on the application class
spring.data.jpa.repositories.enabled=false must be set in the application properties/yml.
如果还是解决不了。升级到1.4.1吧!

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.1.BUILD-SNAPSHOT</version>
    </parent>
 <repositories>
        <repository>
            <id>spring-snapshots</id>
            <url>http://repo.spring.io/snapshot</url>
            <snapshots><enabled>true</enabled></snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <url>http://repo.spring.io/milestone</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <url>http://repo.spring.io/snapshot</url>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <url>http://repo.spring.io/milestone</url>
        </pluginRepository>
    </pluginRepositories>

spring工具类

内置的resouce类型

UrlResource 资料

ClassPathResource 资料

FileSystemResource 资料

ServletContextResource 资料

InputStreamResource 资料

ByteArrayResource 资料

EncodedResource 也就是Resource加上encoding, 可以认为是有编码的资源 资料

VfsResource(在jboss里经常用到, 相应还有 工具类 VfsUtils)

org.springframework.util.xml.ResourceUtils 用于处理表达资源字符串前缀描述资源的工具. 如: "classpath:".
有 getURL, getFile, isFileURL, isJarURL, extractJarFileURL

工具类

  • org.springframework.core.annotation.AnnotationUtils 处理注解
  • org.springframework.core.io.support.PathMatchingResourcePatternResolver 用 于处理 ant 匹配风格(com/.jsp, com/__/.jsp),找出所有的资源, 结合上面的resource的概念一起使用,对于遍历文件很有用. 具体请详细查看javadoc
  • org.springframework.core.io.support.PropertiesLoaderUtils 加载Properties资源工具类,和Resource结合
  • org.springframework.core.BridgeMethodResolver 桥接方法分析器. 关于桥接方法请参考: http://java.sun.com/docs/books/jls/third_edition/html/expressions.html#15.12.4.5
  • org.springframework.core.GenericTypeResolver 范型分析器, 在用于对范型方法, 参数分析.

- org.springframework.core.NestedExceptionUtils

xml工具

  • org.springframework.util.xml.AbstractStaxContentHandler
  • org.springframework.util.xml.AbstractStaxXMLReader
  • org.springframework.util.xml.AbstractXMLReader
  • org.springframework.util.xml.AbstractXMLStreamReader
  • org.springframework.util.xml.DomUtils
  • org.springframework.util.xml.SimpleNamespaceContext
  • org.springframework.util.xml.SimpleSaxErrorHandler
  • org.springframework.util.xml.SimpleTransformErrorListener
  • org.springframework.util.xml.StaxUtils
  • org.springframework.util.xml.TransformerUtils

其它工具集

  • org.springframework.util.xml.AntPathMatcherant风格的处理
  • org.springframework.util.xml.AntPathStringMatcher
  • org.springframework.util.xml.Assert断言,在我们的参数判断时应该经常用
  • org.springframework.util.xml.CachingMapDecorator
  • org.springframework.util.xml.ClassUtils用于Class的处理
  • org.springframework.util.xml.CollectionUtils用于处理集合的工具
  • org.springframework.util.xml.CommonsLogWriter
  • org.springframework.util.xml.CompositeIterator
  • org.springframework.util.xml.ConcurrencyThrottleSupport
  • org.springframework.util.xml.CustomizableThreadCreator
  • org.springframework.util.xml.DefaultPropertiesPersister
  • org.springframework.util.xml.DigestUtils摘要处理, 这里有用于md5处理信息的
  • org.springframework.util.xml.FileCopyUtils文件的拷贝处理, 结合Resource的概念一起来处理, 真的是很方便
  • org.springframework.util.xml.FileSystemUtils
  • org.springframework.util.xml.LinkedCaseInsensitiveMap
  • key值不区分大小写的LinkedMap
  • org.springframework.util.xml.LinkedMultiValueMap一个key可以存放多个值的LinkedMap
  • org.springframework.util.xml.Log4jConfigurer一个log4j的启动加载指定配制文件的工具类
  • org.springframework.util.xml.NumberUtils处理数字的工具类, 有parseNumber 可以把字符串处理成我们指定的数字格式, 还支持format格式, convertNumberToTargetClass 可以实现Number类型的转化.
  • org.springframework.util.xml.ObjectUtils有很多处理null object的方法. 如nullSafeHashCode, nullSafeEquals, isArray, containsElement, addObjectToArray, 等有用的方法
  • org.springframework.util.xml.PatternMatchUtilsspring里用于处理简单的匹配. 如 Spring's typical "xxx_", "_xxx" and "xxx" pattern styles
  • org.springframework.util.xml.PropertyPlaceholderHelper用于处理占位符的替换
  • org.springframework.util.xml.ReflectionUtils反映常用工具方法. 有 findField, setField, getField, findMethod, invokeMethod等有用的方法
  • org.springframework.util.xml.SerializationUtils用于java的序列化与反序列化. serialize与deserialize方法
  • org.springframework.util.xml.StopWatch一个很好的用于记录执行时间的工具类, 且可以用于任务分阶段的测试时间. 最后支持一个很好看的打印格式. 这个类应该经常用 资料
  • org.springframework.util.xml.StringUtils
  • org.springframework.util.xml.SystemPropertyUtils
  • org.springframework.util.xml.TypeUtils用于类型相容的判断. isAssignable
  • org.springframework.util.xml.WeakReferenceMonitor弱引用的监控

和web相关的工具

  • org.springframework.web.util.CookieGenerator
  • org.springframework.web.util.HtmlCharacterEntityDecoder
  • org.springframework.web.util.HtmlCharacterEntityReferences
  • org.springframework.web.util.HtmlUtils
  • org.springframework.web.util.HttpUrlTemplate
    这个类用于用字符串模板构建url, 它会自动处理url里的汉字及其它相关的编码. 在读取别人提供的url资源时, 应该经常用
    String url = "http://localhost/myapp/{name}/{id}"
  • org.springframework.web.util.JavaScriptUtils
  • org.springframework.web.util.Log4jConfigListener
  • 用listener的方式来配制log4j在web环境下的初始化
  • org.springframework.web.util.UriTemplate
  • org.springframework.web.util.UriUtils处理uri里特殊字符的编码
  • org.springframework.web.util.WebUtils
    org.springframework.web.util.

4.EncodedResource(Resource对象,"UTF-8") 编码资源(特殊的);

5.WebApplicationContextUtils

6.StringEscapeutils 编码解码

ReentrantLock源码阅读

ReentrantLock是Lock的一种实现,是一种可重入的互斥锁
看一下构造函数

    private final Sync sync;
    public ReentrantLock() {
        sync = new NonfairSync();
    }

Sync是一个抽象内部类,NonfairSync继承于它,是一种不公平的同步机制,
FairSync是一种公平的同步机制
另一种构造方式

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

以不公平锁为例。

lock

image
获取锁,如果锁无法获取,那么当前的线程就变为不可被调度,直到锁被获取到。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

compareAndSetState来自于抽象父类AbstractQueuedSynchronizer

首先,AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer,AbstractOwnableSynchronizer的实现很简单,它表示独占的同步器,内部使用变量exclusiveOwnerThread表示独占的线程。

其次,AbstractQueuedSynchronizer内部使用CLH锁队列来将并发执行变成串行执行。整个队列是一个双向链表。每个CLH锁队列的节点,会保存前一个节点和后一个节点的引用,当前节点对应的线程,以及一个状态。这个状态用来表明该线程是否应该block。当节点的前一个节点被释放的时候,当前节点就被唤醒,成为头部。新加入的节点会放在队列尾部。

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

setExclusiveOwnerThread就是设置当前线程不可被调度,设置成员
private transient Thread exclusiveOwnerThread;

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire实际调用了

    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

lockInterruptibly

获取锁,除非当前线程被中断。如果获取到了锁,那么立即返回,如果获取不到,那么当前线程变得不可被调度,一直休眠直到下面两件事情发生:
1、当前线程获取到了锁
2、其他的线程中断了当前的线程

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryLock(其实就是调用了nonfairTryAcquire)

如果调用的时候能够获取锁,那么就获取锁并且返回true,如果当前的锁无法获取到,那么这个方法会立刻返回false

unlock

image

释放当前线程占用的锁

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

newCondition

返回一个与当前的锁关联的条件变量。在使用这个条件变量之前,当前线程必须占用锁。调用Condition的await方法,会在等待之前原子地释放锁,并在等待被唤醒后原子的获取锁
返回一个叫ConditionObject的对象

公平锁和非公平锁的区别

公平锁和非公平锁,在CHL队列抢占模式上都是一致的,也就是在进入acquireQueued这个方法之后都一样,它们的区别在初次抢占上有区别,也就是tryAcquire上的区别,下面是两者内部调用关系的简图:

NonfairSync

lock —> compareAndSetState
                | —> setExclusiveOwnerThread
      —> accquire
             | —> tryAcquire
                           |—>nonfairTryAcquire
                |—> acquireQueued

FairSync

lock —> acquire
               | —> tryAcquire
                           |—>!hasQueuePredecessors
                           |—>compareAndSetState
                           |—>setExclusiveOwnerThread
               |—> acquireQueued

真正的区别就是公平锁多了hasQueuePredecessors这个方法,这个方法用于判断CHL队列中是否有节点,对于公平锁,如果CHL队列有节点,则新进入竞争的线程一定要在CHL上排队,而非公平锁则是无视CHL队列中的节点,直接进行竞争抢占,这就有可能导致CHL队列上的节点永远获取不到锁,这就是非公平锁之所以不公平的原因。

参考:http://www.codeceo.com/article/reentrantlock-learn.html

RabbitMQ命令

查看所有队列信息

rabbitmqctl list_queues

关闭应用

rabbitmqctl stop_app

启动应用,和上述关闭命令配合使用,达到清空队列的目的

rabbitmqctl start_app

清除所有队列

rabbitmqctl reset

添加用户:

rabbitmqctl add_user root root

设置权限

rabbitmqctl set_permissions -p / root "." "." ".*"

查看用户

rabbitmqctl list_users

列出所有exchange

rabbitmqctl list_exchanges

列出所有queues

rabbitmqctl list_queues

tomcat 启用gzip

编辑conf/server.xml文件

<Connector
port="8080"               maxHttpHeaderSize="8192"
               maxThreads="150" minSpareThreads="25" maxSpareThreads="75"
               enableLookups="false" redirectPort="8443" acceptCount="100"
               connectionTimeout="20000" disableUploadTimeout="true"
      compression="on"
compressionMinSize="2048"
noCompressionUserAgents="gozilla,traviata" 
compressableMimeType="text/html,text/xml,text/javascript,text/css,text/plain" />
  1. compression="on" 打开压缩功能
  2. compressionMinSize="2048" 启用压缩的输出内容大小,这里面默认为2KB
  3. noCompressionUserAgents="gozilla, traviata" 对于以下的浏览器,不启用压缩<60;
  4. compressableMimeType="text/html,text/xml" 压缩类型

http://hongjiang.info/index/tomcat/

程序中如何减少if

原文:http://code.joejag.com/2016/anti-if-the-missing-patterns.html

似乎写代码的时候经常会遇到如下:

public void theProblem(booleansomeCondition){
//SharedState

if(someCondition){
//CodeBlockA
}else{
//CodeBlockB
}

如何少写if?

案例一,采用boolean的方法:

public void example(){
   FileUtils.createFile("name.txt","filecontents",false);
   FileUtils.createFile("name_temp.txt","filecontents",true);
}

public class FileUtils{
    public static void createFile(String name,String contents,boolean temporary){
        if(temporary){
            //savetempfile
        }else{
            //savepermanentfile
        }
    }
}

解决方法,拆分成两个方法

public void example(){
  FileUtils.createFile("name.txt","filecontents");
  FileUtils.createTemporaryFile("name_temp.txt","filecontents");
}

public class FileUtils{
  public static void createFile(String name,String contents){
    //savepermanentfile
  }

  public static void createTemporaryFile(String name,String contents){
    //savetempfile
  }
}

案例二,采用多态:

public class Bird{
private enum Species{
    EUROPEAN,AFRICAN,NORWEGIAN_BLUE;
}

private boolean isNailed;
private Species type;

public double getSpeed(){
    switch(type){
        case EUROPEAN:
            return getBaseSpeed();
        case AFRICAN:
            return getBaseSpeed() - getLoadFactor();
        case NORWEGIAN_BLUE:
            return isNailed ? 0 : getBaseSpeed();
        default:
            return 0 ;
    }
}

private double getLoadFactor(){
    return 3 ;
}

private double getBaseSpeed(){
    return 10 ;
}
}

解决方法

public abstract class Bird {

    public abstract double getSpeed();

    protected double getLoadFactor() {
        return 3;
    }

    protected double getBaseSpeed() {
        return 10;
    }
}

public class EuropeanBird extends Bird {
    public double getSpeed() {
        return getBaseSpeed();
    }
}

public class AfricanBird extends Bird {
    public double getSpeed() {
        return getBaseSpeed() - getLoadFactor();
    }
}

public class NorwegianBird extends Bird {
    private boolean isNailed;

    public double getSpeed() {
        return isNailed ? 0 : getBaseSpeed();
    }
}

案例三:NullObject/Optional

public void example() {
    sumOf(null);
}

private int sumOf(List<Integer> numbers) {
    if(numbers == null) {
        return 0;
    }

    return numbers.stream().mapToInt(i -> i).sum();
}

防范方法当然是不要传入null

public void example() {
    sumOf(new ArrayList<>());
}

private int sumOf(List<Integer> numbers) {
    return numbers.stream().mapToInt(i -> i).sum();
}

案例四:将内联语句(Inline statements)转为表达式

public boolean horrible(boolean foo, boolean bar, boolean baz) {
    if (foo) {
        if (bar) {
            return true;
        }
    }

    if (baz) {
        return true;
    } else {
        return false;
    }
}
public boolean horrible(boolean foo, boolean bar, boolean baz) {
    return foo && bar || baz;
}

案例五:给出应对策略

public class Repository {
    public String getRecord(int id) {
        return null; // cannot find the record
    }
}

public class Finder {
    public String displayRecord(Repository repository) {
        String record = repository.getRecord(123);
        if(record == null) {
            return "Not found";
        } else {
            return record;
        }
    }
}

解决方法

private class Repository {
    public String getRecord(int id, String defaultValue) {
        String result = Db.getRecord(id);

        if (result != null) {
            return result;
        }

        return defaultValue;
    }
}

public class Finder {
    public String displayRecord(Repository repository) {
        return repository.getRecord(123, "Not found");
    }
}

maven

原文地址: http://www.infoq.com/cn/author/%E8%AE%B8%E6%99%93%E6%96%8C

这里只是摘录点笔记

坐标的原则

<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.8.2</version>
  <scope>test</scope>
</dependency>

滥用坐标、错用坐标的样例比比皆是,在**仓库中我们能看到SpringFramework有两种坐标,其一是直接使用springframework作为groupId,如springframework:spring-beans:1.2.6,另一种是用org.springframework作为groupId,如org.springframework:spring-beans:2.5。细心看看,前一种方式显得比较随意,后一种方式则是基于域名衍生出来的,显然后者更合理,因为用户能一眼根据域名联想到其Maven坐标,方便寻找。因此新版本的SpringFramework构件都使用org.springframework作为groupId。由这个例子我们可以看到坐标规划一个原则是基于项目域名衍生。其实很多流行的开源项目都破坏了这个原则,例如JUnit,这是因为Maven社区在最开始接受构件并部署到**仓库的时候,没有很很严格的限制,而对于这些流行的项目来说,一时间更改坐标会影响大量用户,因此也算是个历史遗留问题了。

还有一个常见的问题是将groupId直接匹配到公司或者组织名称,因为乍一看这是显而易见的。例如组织是zoo.com,有个项目是dog,那有些人就直接使用groupId com.zoo了。如果项目只有一个模块,这是没有什么问题的,但现实世界的项目往往会有很多模块,Maven的一大长处就是通过多模块的方式管理项目。那dog项目可能会有很多模块,我们用坐标的哪个部分来定义模块呢?groupId显然不对,version也不可能是,那只有artifactId。因此要这里有了另外一个原则,用artifactId来定义模块,而不是定义项目。接下来,很显然的,项目就必须用groupId来定义。因此对于dog项目来说,应该使用groupId com.zoo.dog,不仅体现出这是zoo.com下的一个项目,而且可以与该组织下的其他项目如com.zoo.cat区分开来。

除此之外,artifactId的定义也有最佳实践,我们常常可以看到一个项目有很多的模块,例如api,dao,service,web等等。Maven项目在默认情况下生成的构件,其名称不会是基于artifactId,version和packaging生成的,例如api-1.0.jar,dao-1.0.jar等等,他们不会带有groupId的信息,这会造成一个问题,例如当我们把所有这些构件放到Web容器下的时候,你会发现项目dog有api-1.0.jar,项目cat也有api-1.0.jar,这就造成了冲突。更坏的情况是,dog项目有api-1.0.jar,cat项目有api-2.0.jar,其实两者没什么关系,可当放在一起的时候,却很容易让人混淆。为了让坐标更加清晰,又出现了一个原则,即在定义artiafctId时也加入项目的信息,例如dog项目的api模块,那就使用artifactId dog-api,其他就是dog-dao,dao-service等等。虽然连字号是不允许出现在Java的包名中的,但Maven没这个限制。现在dog-api-1.0.jar,cat-2.0.jar被放在一起时,就不容易混淆了。

关于坐标,我们还没谈到version,这里不再详述因为读者可以从Maven: The Complete Guide中找到详细的解释,简言之就是使用这样一个格式:

<主版本>.<次版本>.<增量版本>-<限定符>
其中主版本主要表示大型架构变更,次版本主要表示特性的增加,增量版本主要服务于bug修复,而限定符如alpha、beta等等是用来表示里程碑。当然不是每个项目的版本都要用到这些4个部分,根据需要选择性的使用即可。在此基础上Maven还引入了SNAPSHOT的概念,用来表示活动的开发状态,由于不涉及坐标规划,这里不进行详述。不过有点要提醒的是,由于SNAPSHOT的存在,自己显式地在version中使用时间戳字符串其实没有必要。

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <version>2.3.2</version>
  <configuration>
    <source>1.5</source>
    <target>1.5</target>
  </configuration>
</plugin>

mvn dependency:analyze
分析依赖

最后,还一些重要的POM内容通常被大多数项目所忽略,这些内容不会影响项目的构建,但能方便信息的沟通,它们包括项目URL,开发者信息,SCM信息,持续集成服务器信息等等,这些信息对于开源项目来说尤其重要。对于那些想了解项目的人来说,这些信息能他们帮助找到想要的信息,基于这些信息生成的Maven站点也更有价值。相关的POM配置很简单,如:

<project>
  <description>...</description>
  <url>...</url>
  <licenses>...</licenses>
  <organization>...</organization>
  <developers>...</developers>
  <issueManagement>...</issueManagement>
  <ciManagement>...</ciManagement>
  <mailingLists>...</mailingLists>
  <scm>...</scm>
</project>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactid>spring-beans</artifactId>
  <version>2.5</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactid>spring-context</artifactId>
  <version>2.5</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactid>spring-core</artifactId>
  <version>2.5</version>
</dependency>

你会在一个项目中使用不同版本的SpringFramework组件么?答案显然是不会。因此这里就没必要重复写三次2.5,使用Maven属性将2.5提取出来如下:

<properties>
  <spring.version>2.5</spring.version>
</properties>
<depencencies>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactid>spring-beans</artifactId>
    <version>${spring.version}</version>
  </dependency>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactid>spring-context</artifactId>
    <version>${spring.version}</version>
  </dependency>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactid>spring-core</artifactId>
    <version>${spring.version}</version>
  </dependency>
</depencencies>

现在2.5只出现在一个地方,虽然代码稍微长了点,但重复消失了,日后升级依赖版本的时候,只需要修改一处,而且也能避免漏掉升级某个依赖。

读者可能已经非常熟悉这个例子了,我这里再啰嗦一遍是为了给后面做铺垫,多模块POM重构的目的和该例一样,也是为了消除重复,模块越多,潜在的重复就越多,重构就越有必要。

dependencyManagement只会影响现有依赖的配置,但不会引入依赖。例如我们可以在父模块中配置如下:

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactid>junit</artifactId>
      <version>4.8.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactid>log4j</artifactId>
      <version>1.2.16</version>
    </dependency>
  </dependencies>
</dependencyManagement>

这段配置不会给任何子模块引入依赖,但如果某个子模块需要使用JUnit和Log4j的时候,我们就可以简化依赖配置成这样:

  <dependency>
    <groupId>junit</groupId>
    <artifactid>junit</artifactId>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactid>log4j</artifactId>
  </dependency>

你可以把dependencyManagement放到单独的专门用来管理依赖的POM中,然后在需要使用依赖的模块中通过import scope依赖,就可以引入dependencyManagement。例如可以写这样一个用于依赖管理的POM:

<project>
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.juvenxu.sample</groupId>
  <artifactId>sample-dependency-infrastructure</artifactId>
  <packaging>pom</packaging>
  <version>1.0-SNAPSHOT</version>
  <dependencyManagement>
    <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactid>junit</artifactId>
          <version>4.8.2</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>log4j</groupId>
          <artifactid>log4j</artifactId>
          <version>1.2.16</version>
        </dependency>
    </dependencies>
  </dependencyManagement>
</project>

然后我就可以通过非继承的方式来引入这段依赖管理配置:

  <dependencyManagement>
    <dependencies>
        <dependency>
          <groupId>com.juvenxu.sample</groupId>
          <artifactid>sample-dependency-infrastructure</artifactId>
          <version>1.0-SNAPSHOT</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
    </dependencies>
  </dependencyManagement>

  <dependency>
    <groupId>junit</groupId>
    <artifactid>junit</artifactId>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactid>log4j</artifactId>
  </dependency>

这样,父模块的POM就会非常干净,由专门的packaging为pom的POM来管理依赖,也契合的面向对象设计中的单一职责原则。此外,我们还能够创建多个这样的依赖管理POM,以更细化的方式管理依赖。这种做法与面向对象设计中使用组合而非继承也有点相似的味道。

消除多模块插件配置重复

与dependencyManagement类似的,我们也可以使用pluginManagement元素管理插件。一个常见的用法就是我们希望项目所有模块的使用Maven Compiler Plugin的时候,都使用Java 1.5,以及指定Java源文件编码为UTF-8,这时可以在父模块的POM中如下配置pluginManagement:

<build>
  <pluginManagement>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>1.5</source>
          <target>1.5</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>
    </plugins>
  </pluginManagement>
</build>

这段配置会被应用到所有子模块的maven-compiler-plugin中,由于Maven内置了maven-compiler-plugin与生命周期的绑定,因此子模块就不再需要任何maven-compiler-plugin的配置了。

持续集成?

  • 只维护一个源码仓库
  • 让构建自行测试
  • 每人每天向主干提交代码
  • 每次提交都应在持续集成机器上构建主干
  • 保持快速的构建
  • 在模拟生产环境中测试
  • 让每个人都能轻易获得最新的可执行文件
  • 每个人都能看到进度
  • 自动化部署

现在,我们希望Maven在integration-test阶段执行所有以IT结尾命名的测试类,配置Maven Surefire Plugin如下:

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.7.2</version>
        <executions>
          <execution>
            <id>run-integration-test</id>
            <phase>integration-test</phase>
            <goals>
              <goal>test</goal>
            </goals>
            <configuration>
              <includes>
                <include>**/*IT.java</include>
              </includes>
            </configuration>
          </execution>
        </executions>
      </plugin>

对应于同样的package生命周期阶段,Maven为jar项目调用了maven-jar-plugin,为war项目调用了maven-war-plugin,换言之,packaging直接影响Maven的构建生命周期。了解这一点非常重要,特别是当你需要自定义打包行为的时候,你就必须知道去配置哪个插件。一个常见的例子就是在打包war项目的时候排除某些web资源文件,这时就应该配置maven-war-plugin如下:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-war-plugin</artifactId>
    <version>2.1.1</version>
    <configuration>
      <webResources>
        <resource>
          <directory>src/main/webapp</directory>
          <excludes>
            <exclude>**/*.jpg</exclude>
          </excludes>
        </resource>
      </webResources>
    </configuration>
  </plugin>

本专栏的《坐标规划》一文中曾解释过,一个Maven项目只生成一个主构件,当需要生成其他附属构件的时候,就需要用上classifier。源码包和Javadoc包就是附属构件的极佳例子。它们有着广泛的用途,尤其是源码包,当你使用一个第三方依赖的时候,有时候会希望在IDE中直接进入该依赖的源码查看其实现的细节,如果该依赖将源码包发布到了Maven仓库,那么像Eclipse就能通过m2eclipse插件解析下载源码包并关联到你的项目中,十分方便。由于生成源码包是极其常见的需求,因此Maven官方提供了一个插件来帮助用户完成这个任务:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-source-plugin</artifactId>
    <version>2.1.2</version>
    <executions>
      <execution>
        <id>attach-sources</id>
        <phase>verify</phase>
        <goals>
          <goal>jar-no-fork</goal>
        </goals>
      </execution>
    </executions>
  </plugin>

类似的,生成Javadoc包只需要配置插件如下:

  <plugin>          
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-javadoc-plugin</artifactId>
    <version>2.7</version>
    <executions>
      <execution>
        <id>attach-javadocs</id>
          <goals>
            <goal>jar</goal>
          </goals>
      </execution>
    </executions>
  </plugin>    

为了帮助所有Maven用户更方便的使用Maven**库中海量的资源,**仓库的维护者强制要求开源项目提交构件的时候同时提供源码包和Javadoc包。这是个很好的实践,读者也可以尝试在自己所处的公司内部实行,以促进不同项目之间的交流。

可执行CLI包

除了前面提到了常规JAR包、WAR包,源码包和Javadoc包,另一种常被用到的包是在命令行可直接运行的CLI(Command Line)包。默认Maven生成的JAR包只包含了编译生成的.class文件和项目资源文件,而要得到一个可以直接在命令行通过java命令运行的JAR文件,还要满足两个条件:

JAR包中的/META-INF/MANIFEST.MF元数据文件必须包含Main-Class信息。
项目所有的依赖都必须在Classpath中。
Maven有好几个插件能帮助用户完成上述任务,不过用起来最方便的还是maven-shade-plugin,它可以让用户配置Main-Class的值,然后在打包的时候将值填入/META-INF/MANIFEST.MF文件。关于项目的依赖,它很聪明地将依赖JAR文件全部解压后,再将得到的.class文件连同当前项目的.class文件一起合并到最终的CLI包中,这样,在执行CLI JAR文件的时候,所有需要的类就都在Classpath中了。下面是一个配置样例:

  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
    <executions>
      <execution>
        <phase>package</phase>
        <goals>
          <goal>shade</goal>
        </goals>
        <configuration>
          <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.juvenxu.mavenbook.HelloWorldCli</mainClass>
            </transformer>
          </transformers>
        </configuration>
      </execution>
    </executions>
  </plugin>

上述例子中的,我的Main-Class是com.juvenxu.mavenbook.HelloWorldCli,构建完成后,对应于一个常规的hello-world-1.0.jar文件,我还得到了一个hello-world-1.0-cli.jar文件。细心的读者可能已经注意到了,这里用的是cli这个classifier。最后,我可以通过java -jar hello-world-1.0-cli.jar命令运行程序。

自定义格式包

实际的软件项目常常会有更复杂的打包需求,例如我们可能需要为客户提供一份产品的分发包,这个包不仅仅包含项目的字节码文件,还得包含依赖以及相关脚本文件以方便客户解压后就能运行,此外分发包还得包含一些必要的文档。这时项目的源码目录结构大致是这样的:

pom.xml
src/main/java/
src/main/resources/
src/test/java/
src/test/resources/
src/main/scripts/
src/main/assembly/
README.txt
除了基本的pom.xml和一般Maven目录之外,这里还有一个src/main/scripts/目录,该目录会包含一些脚本文件如run.sh和run.bat,src/main/assembly/会包含一个assembly.xml,这是打包的描述文件,稍后介绍,最后的README.txt是份简单的文档。

我们希望最终生成一个zip格式的分发包,它包含如下的一个结构:

bin/
lib/
README.txt
其中bin/目录包含了可执行脚本run.sh和run.bat,lib/目录包含了项目JAR包和所有依赖JAR,README.txt就是前面提到的文档。

描述清楚需求后,我们就要搬出Maven最强大的打包插件:maven-assembly-plugin。它支持各种打包文件格式,包括zip、tar.gz、tar.bz2等等,通过一个打包描述文件(该例中是src/main/assembly.xml),它能够帮助用户选择具体打包哪些文件集合、依赖、模块、和甚至本地仓库文件,每个项的具体打包路径用户也能自由控制。如下就是对应上述需求的打包描述文件src/main/assembly.xml:

<assembly>
  <id>bin</id>
  <formats>
    <format>zip</format>
  </formats>
  <dependencySets>
    <dependencySet>
      <useProjectArtifact>true</useProjectArtifact>
      <outputDirectory>lib</outputDirectory>
    </dependencySet>
  </dependencySets>
  <fileSets>
    <fileSet>
      <outputDirectory>/</outputDirectory>
      <includes>
        <include>README.txt</include>
      </includes>
    </fileSet>
    <fileSet>
      <directory>src/main/scripts</directory>
      <outputDirectory>/bin</outputDirectory>
      <includes>
        <include>run.sh</include>
        <include>run.bat</include>
      </includes>
    </fileSet>
  </fileSets>
</assembly>

首先这个assembly.xml文件的id对应了其最终生成文件的classifier。
其次formats定义打包生成的文件格式,这里是zip。因此结合id我们会得到一个名为hello-world-1.0-bin.zip的文件。(假设artifactId为hello-world,version为1.0)
dependencySets用来定义选择依赖并定义最终打包到什么目录,这里我们声明的一个depenencySet默认包含所有所有依赖,而useProjectArtifact表示将项目本身生成的构件也包含在内,最终打包至输出包内的lib路径下(由outputDirectory指定)。
fileSets允许用户通过文件或目录的粒度来控制打包。这里的第一个fileSet打包README.txt文件至包的根目录下,第二个fileSet则将src/main/scripts下的run.sh和run.bat文件打包至输出包的bin目录下。
打包描述文件所支持的配置远超出本文所能覆盖的范围,为了避免读者被过多细节扰乱思维,这里不再展开,读者若有需要可以去参考这份文档。

最后,我们需要配置maven-assembly-plugin使用打包描述文件,并绑定生命周期阶段使其自动执行打包操作:

  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>2.2.1</version>
    <configuration>
      <descriptors>
        <descriptor>src/main/assembly/assembly.xml</descriptor>
      </descriptors>
    </configuration>
    <executions>
      <execution>
        <id>make-assembly</id>
        <phase>package</phase>
        <goals>
          <goal>single</goal>
        </goals>
      </execution>
    </executions>
  </plugin>

运行mvn clean package之后,我们就能在target/目录下得到名为hello-world-1.0-bin.zip的分发包了。

springtransaction 事务

事务传播属性

    @Transactional(propagation=Propagation.REQUIRED) //如果有事务,那么加入事务,没有的话新建一个(不写的情况下)  **默认**
    @Transactional(propagation=Propagation.NOT_SUPPORTED) //容器不为这个方法开启事务  
    @Transactional(propagation=Propagation.REQUIRES_NEW) //不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务  
    @Transactional(propagation=Propagation.MANDATORY) //必须在一个已有的事务中执行,否则抛出异常  
    @Transactional(propagation=Propagation.NEVER) //必须在一个没有的事务中执行,否则抛出异常(与Propagation.MANDATORY相反)  
    @Transactional(propagation=Propagation.SUPPORTS) //如果其他bean调用这个方法,在其他bean中声明事务,那就用事务.如果其他bean没有声明事务,那就不用事务.  
    @Transactional(propagation=Propagation.NESTED)   //如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。 
    @Transactional (propagation = Propagation.REQUIRED,readOnly=true) //readOnly=true只读,不能更新,删除   
    @Transactional (propagation = Propagation.REQUIRED,timeout=30)//设置超时时间   
    @Transactional (propagation = Propagation.REQUIRED,isolation=Isolation.DEFAULT)//设置数据库隔离级别  

xml配置:

  <!-- hibernate事务管理器 -->  
    <bean id="transactionManager"
        class="org.springframework.orm.hibernate3.HibernateTransactionManager">
        <property name="sessionFactory" ref="sessionFactory" />
    </bean>
    
<!-- mybatis配置事务管理 -->  
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">       
          <property name="dataSource" ref="dataSource"></property>  
    </bean> 

1.直接配置

    <bean id="userDao"  
        class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">  
           <!-- 配置事务管理器 -->  
           <property name="transactionManager" ref="transactionManager" />     
        <property name="target" ref="userDaoTarget" />  
         <property name="proxyInterfaces" value="com.absurd.xxxDao" />
        <!-- 配置事务属性 -->  
        <property name="transactionAttributes">  
            <props>  
                <prop key="*">PROPAGATION_REQUIRED</prop>
            </props>  
        </property>  
    </bean>  

2.共享一个代理基类

<bean id="transactionBase"  
            class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean"  
            lazy-init="true" abstract="true">  
        <!-- 配置事务管理器 -->  
        <property name="transactionManager" ref="transactionManager" />  
        <!-- 配置事务属性 -->  
        <property name="transactionAttributes">  
            <props>  
                <prop key="*">PROPAGATION_REQUIRED</prop>  
            </props>  
        </property>  
    </bean>    
   
    
    <bean id="userDao" parent="transactionBase" >  
        <property name="target" ref="userDaoTarget" />   
    </bean>
 <bean id="transactionInterceptor"  
       class="org.springframework.transaction.interceptor.TransactionInterceptor">  
       <property name="transactionManager" ref="transactionManager" />  
       <!-- 配置事务属性 -->  
       <property name="transactionAttributes">  
           <props>  
               <prop key="*">PROPAGATION_REQUIRED</prop>  
           </props>  
       </property>  
   </bean>
     
   <bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator">  
       <property name="beanNames">  
           <list>  
               <value>*Dao</value>
           </list>  
       </property>  
       <property name="interceptorNames">  
           <list>  
               <value>transactionInterceptor</value>  
           </list>  
       </property>  
   </bean>  

4.使用tx标签配置的拦截器

 <tx:advice id="txAdvice" transaction-manager="transactionManager">
       <tx:attributes>
           <tx:method name="*" propagation="REQUIRED" />
       </tx:attributes>
   </tx:advice>
   
   <aop:config>
       <aop:pointcut id="interceptorPointCuts"
           expression="execution(* com.absurd.*.*(..))" />
       <aop:advisor advice-ref="txAdvice"
           pointcut-ref="interceptorPointCuts" />        
   </aop:config>      

5.全注解

<tx:annotation-driven transaction-manager="transactionManager"/>

6.编程:

DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

TransactionStatus status = txManager.getTransaction(def);
try {
  userMapper.insertUser(user);
}
catch (MyException ex) {
  txManager.rollback(status);
  throw ex;
}
txManager.commit(status);

TransactionStatus

	boolean isNewTransaction();

	boolean hasSavepoint();

	void setRollbackOnly();

	boolean isRollbackOnly();

	void flush();

	boolean isCompleted();

PlatformTransactionManager

	TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;

	void commit(TransactionStatus status) throws TransactionException;

	void rollback(TransactionStatus status) throws TransactionException;

源码解析:
从TxNamespaceHandler入手

	@Override
	public void init() {
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
	}

可以看到分别用不同的解析器去解析xml,
TxAdviceBeanDefinitionParser用来解析<tx:advice>并被解析为RuleBasedTransactionAttribute。

parse(Element element, ParserContext parserContext)->parseInternal(Element element, ParserContext parserContext)->getBeanClass(Element element)->TransactionInterceptor
解析到的bean被设置为TransactionInterceptor统一拦截

image

AnnotationDrivenBeanDefinitionParser是用来解析<annotation-driven>为RootBeanDefinition

image

DataSourceTransactionManager 和DataSourceTransactionManager
image

TransactionInterceptor

	@Override
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
		return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
			@Override
			public Object proceedWithInvocation() throws Throwable {
				return invocation.proceed();
			}
		});
	}



protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
			throws Throwable {
		// If the transaction attribute is null, the method is non-transactional.
		final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//选择事务管理器
		final PlatformTransactionManager tm = determineTransactionManager(txAttr);
//切面方法标识
		final String joinpointIdentification = methodIdentification(method, targetClass);

		if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
			TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
			Object retVal = null;
			try {
//原有逻辑执行
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// target invocation exception
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
//清理TransactionInfo信息
				cleanupTransactionInfo(txInfo);
			}
//提交事务
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}

		else {
			// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
			try {
				Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
						new TransactionCallback<Object>() {
							@Override
							public Object doInTransaction(TransactionStatus status) {
								TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
								try {
									return invocation.proceedWithInvocation();
								}
								catch (Throwable ex) {
									if (txAttr.rollbackOn(ex)) {
										// A RuntimeException: will lead to a rollback.
										if (ex instanceof RuntimeException) {
											throw (RuntimeException) ex;
										}
										else {
											throw new ThrowableHolderException(ex);
										}
									}
									else {
										// A normal return value: will lead to a commit.
										return new ThrowableHolder(ex);
									}
								}
								finally {
									cleanupTransactionInfo(txInfo);
								}
							}
						});

				// Check result: It might indicate a Throwable to rethrow.
				if (result instanceof ThrowableHolder) {
					throw ((ThrowableHolder) result).getThrowable();
				}
				else {
					return result;
				}
			}
			catch (ThrowableHolderException ex) {
				throw ex.getCause();
			}
		}
	}


protected TransactionInfo createTransactionIfNecessary(
			PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

		// If no name specified, apply method identification as transaction name.
		if (txAttr != null && txAttr.getName() == null) {
			txAttr = new DelegatingTransactionAttribute(txAttr) {
				@Override
				public String getName() {
					return joinpointIdentification;
				}
			};
		}

		TransactionStatus status = null;
		if (txAttr != null) {
			if (tm != null) {
//获取事务状态
				status = tm.getTransaction(txAttr);
			}
			else {
//debug..
			}
		}
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}

接下来看下:AbstractPlatformTransactionManager#getTransaction

@Override
	public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
		Object transaction = doGetTransaction();

//...
                //如果存在事务
		if (isExistingTransaction(transaction)) {
			
			return handleExistingTransaction(definition, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
		}

		//不存在事务,Propagation.MANDATORY,抛出异常
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
//PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED
		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//不用挂起
			SuspendedResourcesHolder suspendedResources = suspend(null);
//debug...
			}
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException ex) {
				resume(null, suspendedResources);
				throw ex;
			}
			catch (Error err) {
				resume(null, suspendedResources);
				throw err;
			}
		}
		else {
			//创建空事务,同步
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + definition);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
		}
	}
private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {
//PROPAGATION_NEVER抛出异常(Propagation.NEVER)
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}
//PROPAGATION_NOT_SUPPORTED不为这个方法开启事务(Propagation.NOT_SUPPORTED)
//线程存在事务则挂起(挂起实现其实就是断开连接下次进行重连)
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
//debug...
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
 //创建非事务的事务状态,让方法非事务地执行  
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}
//PROPAGATION_REQUIRES_NEW不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务(Propagation.REQUIRES_NEW )
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			//...
//挂起原来事务
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
//只要不是never就执行完毕新的
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException beginEx) {
//异常了恢复原来事务
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
			catch (Error beginErr) {
				resumeAfterBeginException(transaction, suspendedResources, beginErr);
				throw beginErr;
			}
		}
//NESTED如果是嵌套事务(TransactionDefinition.PROPAGATION_NESTED)
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//如果不允许事务嵌套,则抛出异常
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");
			}
//debug...
			}
//如果允许使用savepoint保存点保存嵌套事务  
			if (useSavepointForNestedTransaction()) {
				   //为当前事务创建一个保存点  
				DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				status.createAndHoldSavepoint();
				return status;
			}
			else {
				 //如果不允许使用savepoint保存点保存嵌套事务 ,使用JTA的嵌套commit/rollback调用  
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, null);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
		}

//debug...
//PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
 //校验已存在的事务,如果已有事务与事务属性配置不一致,则抛出异常  
		if (isValidateExistingTransaction()) {
  //如果事务隔离级别不是默认隔离级别 ,获取到的当前事务隔离级别为null 或 获取不等于事务属性配置的隔离级别
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					Constants isoConstants = DefaultTransactionDefinition.constants;
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +
							(currentIsolationLevel != null ?
									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
									"(unknown)"));
				}
			}
                //如果当前已有事务是只读 ,抛出异常
			if (!definition.isReadOnly()) {
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}
	@Override
	protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;
		try {
			if (txObject.getConnectionHolder() == null ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
				Connection newCon = this.dataSource.getConnection();
//debug...
				}
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}

			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();

			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			txObject.setPreviousIsolationLevel(previousIsolationLevel);

			
			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				//debug...
				con.setAutoCommit(false);
			}
			txObject.getConnectionHolder().setTransactionActive(true);

			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			// Bind the session holder to the thread.
			if (txObject.isNewConnectionHolder()) {
				TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, this.dataSource);
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
//如果事务是激活的,且当前线程事务同步机制也是激活状态  
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
//挂起当前线程中所有同步的事务  TransactionSynchronization#suspend   还有 TransactionSynchronizationManager#unbindResource
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				if (transaction != null) {
					suspendedResources = doSuspend(transaction);
				}
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
			catch (Error err) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw err;
			}
		}
		else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}



//doSuspendSynchronization方法将逐个挂起当前线程中的TransactionSynchronization
private List<TransactionSynchronization> doSuspendSynchronization() {
		List<TransactionSynchronization> suspendedSynchronizations =
				TransactionSynchronizationManager.getSynchronizations();
		for (TransactionSynchronization synchronization : suspendedSynchronizations) {
			synchronization.suspend();
		}
		TransactionSynchronizationManager.clearSynchronization();
		return suspendedSynchronizations;
	}

//ConnectionSynchronization#suspend
		public void suspend() {
			if (this.holderActive) {
//解绑数据源
				TransactionSynchronizationManager.unbindResource(this.dataSource);
// 如果存在连接,且处于打开状态
				if (this.connectionHolder.hasConnection() && !this.connectionHolder.isOpen()) {
 // 当挂起的时候如果没有句柄连接到该connection,将释放该连接
 // 当resume的时候 会重开打开一个连接参与到原来的事务中
					releaseConnection(this.connectionHolder.getConnection(), this.dataSource);
					this.connectionHolder.setConnection(null);
				}
			}
		}

//恢复事务
	private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
		TransactionSynchronizationManager.initSynchronization();
		for (TransactionSynchronization synchronization : suspendedSynchronizations) {
			synchronization.resume();
			TransactionSynchronizationManager.registerSynchronization(synchronization);
		}
	}

//SqlSessionSynchronization#resume
    @Override
    public void resume() {
      if (this.holderActive) {
       //debug...
        TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder);
      }
    }


//DataSourceTransactionManager#doSuspend
	@Override
	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		txObject.setConnectionHolder(null);
		ConnectionHolder conHolder = (ConnectionHolder)
				TransactionSynchronizationManager.unbindResource(this.dataSource);
		return conHolder;
	}

事务提交

AbstractPlatformTransactionManager##commit

	@Override
	public final void commit(TransactionStatus status) throws TransactionException {
//如果事务状态是已完成,再次提交会抛出“Transaction is already completed - do not call commit or rollback more than once per transaction”
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
//rollback only
		if (defStatus.isLocalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Transactional code has requested rollback");
			}
//回滚
			processRollback(defStatus);
			return;
		}
		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
			}
			processRollback(defStatus);
			// Throw UnexpectedRollbackException only at outermost transaction boundary
			// or if explicitly asked to.
			if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
				throw new UnexpectedRollbackException(
						"Transaction rolled back because it has been marked as rollback-only");
			}
			return;
		}

		processCommit(defStatus);
	}

private void processRollback(DefaultTransactionStatus status) {
		try {
			try {
//回调TransactionSynchronization对象的beforeCompletion方法。
				triggerBeforeCompletion(status);
//有保存点
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Rolling back transaction to savepoint");
					}
//回滚到保存点
					status.rollbackToHeldSavepoint();
				}
//如果是一个新事务
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction rollback");
					}
//rollback
					doRollback(status);
				}
				else if (status.hasTransaction()) {
//如果RollbackOnly或者globalRollbackOnParticipationFailure(部分失败)
					if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
						}
						doSetRollbackOnly(status);
					}
					else {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
						}
					}
				}
				else {
					logger.debug("Should roll back transaction but cannot - no transaction available");
				}
			}
			catch (RuntimeException ex) {
//TransactionSynchronization 的afterCompletion
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				throw ex;
			}
			catch (Error err) {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				throw err;
			}
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
		}
		finally {
			cleanupAfterCompletion(status);
		}
	}


private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			try {
				prepareForCommit(status);
//TransactionSynchronization 的beforeCommit 提交前提示
				triggerBeforeCommit(status);
//TransactionSynchronization 的beforeCompletion 完成前提示
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				boolean globalRollbackOnly = false;
				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
					globalRollbackOnly = status.isGlobalRollbackOnly();
				}
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					status.releaseHeldSavepoint();
				}
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
//提交事务
					doCommit(status);
				}
				// Throw UnexpectedRollbackException if we have a global rollback-only
				// marker but still didn't get a corresponding exception from commit.
				if (globalRollbackOnly) {
					throw new UnexpectedRollbackException(
							"Transaction silently rolled back because it has been marked as rollback-only");
				}
			}
			catch (UnexpectedRollbackException ex) {
				// can only be caused by doCommit
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				throw ex;
			}
			catch (TransactionException ex) {
				// can only be caused by doCommit
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				}
				throw ex;
			}
			catch (RuntimeException ex) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, ex);
				throw ex;
			}
			catch (Error err) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, err);
				throw err;
			}

			// Trigger afterCommit callbacks, with an exception thrown there
			// propagated to callers but the transaction still considered as committed.
			try {
				triggerAfterCommit(status);
			}
			finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}

		}
		finally {
			cleanupAfterCompletion(status);
		}
	}

隔离级别(其实最后是通过jdbc的隔离级别做的):

Isolation.DEFAULT(TransactionDefinition.ISOLATION_DEFAULT)
使用数据库默认的事务隔离级别。

Isolation.READ_UNCOMMITTED(TransactionDefinition.ISOLATION_READ_UNCOMMITTED),
这是事务最低的隔离级别,它允许另外一个事务可以看到这个事务未提交的数据。这种隔离级别会产生脏读,不可重复读和幻像读。
实现:SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED

Isolation.READ_COMMITTED(TransactionDefinition.ISOLATION_READ_COMMITTED),
保证一个事务修改的数据提交后才能被另外一个事务读取。另外一个事务不能读取该事务未提交的数据。这种事务隔离级别可以避免脏读出现,但是可能会出现不可重复读和幻像读。
实现: SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED

Isolation.REPEATABLE_READ(TransactionDefinition.ISOLATION_REPEATABLE_READ),
这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻像读。
实现:SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ

Isolation.SERIALIZABLE(TransactionDefinition.ISOLATION_SERIALIZABLE);
这是花费最高代价但是最可靠的事务隔离级别。事务被处理为顺序执行。除了防止脏读,不可重复读外,还避免了幻读。
实现:SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE

详细:http://www1350.github.io/#post/64

rabbitmq

linux下安装rabbitmq
1.安装erlang虚拟机
需要安装ncurses : yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel
http://www.wxwidgets.org/downloads/):
下载wxWidgets 源码包 后解压缩并编译安装

 bzip2 -d wxWidgets-3.0.0.tar.bz2     tar   -jxvf 
tar -xvf wxWidgets-3.0.0.tar
 ./configure && make && make install

Rabbitmq 基于 erlang 语言开发,所有需要安装 erlang 虚拟机。
下载源码编译安装:
http://www.erlang.org/download/otp_src_R16B.tar.gz
上传到/usr/local/src目录

[root@localhost ~]# cd /usr/local/src/
[root@localhost src]# tar -zxvf otp_src_R16B.tar.gz 
[root@localhost src]# cd otp_src_R16B
[root@localhost otp_src_R16B]# ./configure 
[root@localhost otp_src_R16B]# make
[root@localhost otp_src_R16B]# make install

[root@localhost otp_src_R16B]# erl
Erlang R16B (erts-5.10.1) [source] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.1 (abort with ^G)
1> halt().

2.安装(rabbitmq-server-3.5.2) 默认端口5672 管理
下载安装:http://www.rabbitmq.com/install-generic-unix.html rabbitmq-server-generic-unix-3.5.2.tar.gz
上传到/usr/local/src目录

[root@localhost ~]# cd /usr/local/
[root@localhost local]# tar -zxvf rabbitmq-server-generic-unix-3.5.2.tar.gz 
[root@localhost local]# cd rabbitmq_server-3.5.2/
[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmq-server -detached

启用管理方式(用网页方式管理MQ)
执行./sbin/rabbitmq-plugins enable rabbitmq_management

3.建立感知需要的用户及授权
1)使用rabbitmqctl客户端工具,在根目录下创建/ home/mqhost1这个vhost:

[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmqctl add_vhost /home/absurd .vhost

2)创建感知用户absurd 密码absurd

[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmqctl add_user absurd absurd 

3)设置ganzi用户对/ home/ganzi.vhost这个vhost拥有全部权限:
[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmqctl set_permissions -p /home/absurd.vhost absurd ".*" ".*" ".*"
4)设置用户角色

[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmqctl set_user_tags absurd administrator
[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmqctl list_users

Listing users ...
absurd [administrator]
guest [administrator]
5)修改guest用户密码为absurd
[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmqctl change_password guest absurd
6)启用管理方式(用网页方式管理MQ)
[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmq-plugins enable rabbitmq_management
7)重新启动MQ

[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmqctl stop
[root@localhost rabbitmq_server-3.5.2]# ./sbin/rabbitmq-server -detached
  1. 用户管理

用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。

相应的命令

(1) 新增一个用户

rabbitmqctl add_user Username Password

(2) 删除一个用户

rabbitmqctl delete_user Username

(3) 修改用户的密码
rabbitmqctl change_password Username Newpassword

(4) 查看当前用户列表

rabbitmqctl list_users

  1. 用户角色

按照个人理解,用户角色可分为五类,超级管理员, 监控者, 策略制定者, 普通管理者以及其他。

(1) 超级管理员(administrator)

可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

(2) 监控者(monitoring)

可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

(3) 策略制定者(policymaker)

可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

与administrator的对比,administrator能看到这些内容

(4) 普通管理者(management)

仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

(5) 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

了解了这些后,就可以根据需要给不同的用户设置不同的角色,以便按需管理。

设置用户角色的命令为:

rabbitmqctl set_user_tags User Tag

User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。

也可以给同一用户设置多个角色,例如

rabbitmqctl set_user_tags hncscwc monitoring policymaker

  1. 用户权限

用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。

例如: 将queue绑定到某exchange上,需要具有queue的可写权限,以及exchange的可读权限;向exchange发送消息需要具有exchange的可写权限;从queue里取数据需要具有queue的可读权限。详细请参考官方文档中"How permissions work"部分。

相关命令为:

(1) 设置用户权限

rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP

(2) 查看(指定hostpath)所有用户的权限信息

rabbitmqctl list_permissions [-p VHostPath]

(3) 查看指定用户的权限信息

rabbitmqctl list_user_permissions User

(4) 清除用户的权限信息

rabbitmqctl clear_permissions [-p VHostPath] User

查看所有队列信息

rabbitmqctl list_queues

关闭应用

rabbitmqctl stop_app

启动应用,和上述关闭命令配合使用,达到清空队列的目的

rabbitmqctl start_app

清除所有队列

rabbitmqctl reset

添加用户:

rabbitmqctl add_user root root

设置权限

rabbitmqctl set_permissions -p / root "." "." ".*"

查看用户

rabbitmqctl list_users

列出所有exchange

rabbitmqctl list_exchanges

列出所有queues

rabbitmqctl list_queues

解决emoji存储

1.utf8mb4的最低mysql版本支持版本为5.5.3+,若不是,请升级到较新版本。

2.修改mysql配置文件my.cnf(windows为my.ini)
my.cnf一般在etc/mysql/my.cnf位置。找到后请在以下三部分里添加如下内容:

[client]
default-character-set = utf8mb4

[mysql]
default-character-set = utf8mb4

[mysqld]
character-set-client-handshake = FALSE
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
init_connect='SET NAMES utf8mb4'

在mysql中执行:

set character_set_client = utf8mb4;
set character_set_connection = utf8mb4;
set character_set_database = utf8mb4;
set character_set_results = utf8mb4;
set character_set_server = utf8mb4;

重启mysql
Linux:service mysql restart

3.修改database、table和column字符集。参考以下语句:

ALTER DATABASE database_name CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
ALTER TABLE table_name CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
ALTER TABLE table_name MODIFY COLUMN column_name VARCHAR(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci ;

查看是否修改成功
SHOW VARIABLES WHERE Variable_name LIKE 'character\_set\_%' OR Variable_name LIKE 'collation%';

4.如果你用的是java服务器,升级或确保你的mysql connector版本高于5.1.13,否则仍然无法使用utf8mb4

5.jdbc的url必须&characterEncoding=utf8

6.备份数据库的时候
mysqldump -uroot -p --default-character-set=utf8mb4 --hex-blob databasename > databasename.sql

注:在navicat里面会看到乱码
可选:

7.解决不兼容问题

public class EmojiUtil {

    public static String[] ios5emoji ;
    public static String[] ios4emoji ;
    public static String[] androidnullemoji ;
    public static String[] adsbuniemoji;

    public static void initios5emoji(String[] i5emj,String[] i4emj,String[] adnullemoji,String[] adsbemoji){
        ios5emoji = i5emj;
        ios4emoji = i4emj;
        androidnullemoji = adnullemoji;
        adsbuniemoji = adsbemoji;
    }

    //在ios上将ios5转换为ios4编码
    public static String transToIOS4emoji(String src) {
        return StringUtils.replaceEach(src, ios5emoji, ios4emoji);
    }
    //在ios上将ios4转换为ios5编码
    public static String transToIOS5emoji(String src) {
        return StringUtils.replaceEach(src, ios4emoji, ios5emoji);
    }
    //在android上将ios5的表情符替换为空
    public static String transToAndroidemojiNull(String src) {
        return StringUtils.replaceEach(src, ios5emoji, androidnullemoji);
    }

    //在android上将ios5的表情符替换为SBUNICODE
    public static String transToAndroidemojiSB(String src) {
        return StringUtils.replaceEach(src, ios5emoji, adsbuniemoji);
    }

    //在android上将SBUNICODE的表情符替换为ios5
    public static String transSBToIOS5emoji(String src) {
        return StringUtils.replaceEach(src, adsbuniemoji, ios5emoji);
    }

    //eg. param: 0xF0 0x9F 0x8F 0x80
    public static String hexstr2String(String hexstr) throws UnsupportedEncodingException{
        byte[] b = hexstr2bytes(hexstr);
        return new String(b, "UTF-8");
    }

    //eg. param: E018
    public static String sbunicode2utfString(String sbhexstr) throws UnsupportedEncodingException{
        byte[] b = sbunicode2utfbytes(sbhexstr);
        return new String(b, "UTF-8");
    }

    //eg. param: 0xF0 0x9F 0x8F 0x80
    public static byte[] hexstr2bytes(String hexstr){
        String[] hexstrs = hexstr.split(" ");
        byte[] b = new byte[hexstrs.length];

        for(int i=0;i<hexstrs.length;i++){
            b[i] = hexStringToByte(hexstrs[i].substring(2))[0];
        }
        return b;
    }

    //eg. param: E018
    public static byte[] sbunicode2utfbytes(String sbhexstr) throws UnsupportedEncodingException{
        int inthex = Integer.parseInt(sbhexstr, 16);
        char[] schar = {(char)inthex};
        byte[] b = (new String(schar)).getBytes("UTF-8");
        return b;
    }

    public static byte[] hexStringToByte(String hex) {
        int len = (hex.length() / 2);
        byte[] result = new byte[len];
        char[] achar = hex.toCharArray();
        for (int i = 0; i < len; i++) {
            int pos = i * 2;
            result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
        }
        return result;
    }


    private static byte toByte(char c) {
        byte b = (byte) "0123456789ABCDEF".indexOf(c);
        return b;
    }


    /**
     * 将str中的emoji表情转为byte数组
     * 
     * @param str
     * @return
     */
    public static String resolveToByteFromEmoji(String str) {
        Pattern pattern = Pattern
                .compile("[^(\u2E80-\u9FFF\\w\\s`~!@#\\$%\\^&\\*\\(\\)_+-?()——=\\[\\]{}\\|;。,、《》”:;“!……’:'\"<,>\\.?/\\\\*)]");
        Matcher matcher = pattern.matcher(str);
        StringBuffer sb2 = new StringBuffer();
        while (matcher.find()) {
            matcher.appendReplacement(sb2, resolveToByte(matcher.group(0)));
        }
        matcher.appendTail(sb2);
        return sb2.toString();
    }

    /**
     * 将str中的byte数组类型的emoji表情转为正常显示的emoji表情
     * 
     * @param str
     * @return
     */
    public static String resolveToEmojiFromByte(String str) {
        Pattern pattern2 = Pattern.compile("<:([[-]\\d*[,]]+):>");
        Matcher matcher2 = pattern2.matcher(str);
        StringBuffer sb3 = new StringBuffer();
        while (matcher2.find()) {
            matcher2.appendReplacement(sb3, resolveToEmoji(matcher2.group(0)));
        }
        matcher2.appendTail(sb3);
        return sb3.toString();
    }

    private static String resolveToByte(String str) {
        byte[] b = str.getBytes();
        StringBuffer sb = new StringBuffer();
        sb.append("<:");
        for (int i = 0; i < b.length; i++) {
            if (i < b.length - 1) {
                sb.append(Byte.valueOf(b[i]).toString() + ",");
            } else {
                sb.append(Byte.valueOf(b[i]).toString());
            }
        }
        sb.append(":>");
        return sb.toString();
    }

    private static String resolveToEmoji(String str) {
        str = str.replaceAll("<:", "").replaceAll(":>", "");
        String[] s = str.split(",");
        byte[] b = new byte[s.length];
        for (int i = 0; i < s.length; i++) {
            b[i] = Byte.valueOf(s[i]);
        }
        return new String(b);
    }

    public static void main(String[] args) throws UnsupportedEncodingException {
        // TODO Auto-generated method stub
        byte[] b1 = {-30,-102,-67}; //ios5 //0xE2 0x9A 0xBD     
        byte[] b2 = {-18,-128,-104}; //ios4 //"E018"

        //-------------------------------------

        byte[] b3 = {-16,-97,-113,-128};    //0xF0 0x9F 0x8F 0x80       
        byte[] b4 = {-18,-112,-86};         //E42A  


        ios5emoji = new String[]{new String(b1,"utf-8"),new String(b3,"utf-8")};
        ios4emoji = new String[]{new String(b2,"utf-8"),new String(b4,"utf-8")};    





        //测试字符串
        byte[] testbytes = {105,111,115,-30,-102,-67,32,36,-18,-128,-104,32,36,-16,-97,-113,-128,32,36,-18,-112,-86};
        String tmpstr = new String(testbytes,"utf-8");
        System.out.println(tmpstr);


        //转成ios4的表情
        String ios4str = transToIOS5emoji(tmpstr);
        byte[] tmp = ios4str.getBytes();
        //System.out.print(new String(tmp,"utf-8"));        
        for(byte b:tmp){
            System.out.print(b);
            System.out.print(" ");
        }
    }

}

另:
如果你不想重启数据库,可以这样做:

1.表必须是utf8mb4

2.连接池
<property name="connectionInitSqls" value="set names utf8mb4;"/>

或者每次插入前执行
set names utf8mb4

mybatis+spring

注解式

    <bean id="productSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <!-- 自动扫描mapping.xml文件
        <property name="mapperLocations" value="classpath*:com/absurd/*/mapping/*.xml" /> -->
        <property name="configLocation" value="classpath:mybatis/configuration.xml"/>
        <property name="dataSource" ref="productDataSource" />
    </bean>

    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer" id="zheshangProductMapperScannerConfigurer">
        <property name="basePackage" value="com.absurd.mydemo.mapper.testmy" />
        <property name="sqlSessionFactoryBeanName" value="productSqlSessionFactory" />
    </bean>

在接口上使用注解

public interface IAbsurdMapper {.
   @Select(" SELECT * from user where id = #{id}")
 @Results( value = { @Result(column = "id", property = "id"),
                       @Result(column = "user_name", property = "username"})
    public User getUser(@Param("id") Long id);
}

<?xml version="1.0" encoding="UTF-8" ?> <!-- 该文件为mybatis的全局配置文件 -->
<!DOCTYPE configuration      
    PUBLIC "-//ibatis.apache.org//DTD Config 3.0//EN"      
    "http://ibatis.apache.org/dtd/ibatis-3-config.dtd">
<configuration>
    <settings>

        <!-- 全局映射器启用缓存 -->
        <setting name="cacheEnabled" value="true" />
        <!-- 查询时,关闭关联对象即时加载以提高性能 -->
        <setting name="lazyLoadingEnabled" value="true" />
        <!-- 设置关联对象加载的形态,此处为按需加载字段(加载字段由SQL指 定),不会加载关联表的所有字段,以提高性能 -->
        <setting name="aggressiveLazyLoading" value="false" />
        <!-- 对于未知的SQL查询,允许返回不同的结果集以达到通用的效果 -->
        <setting name="multipleResultSetsEnabled" value="true" />
        <!-- 允许使用列标签代替列名 -->
        <setting name="useColumnLabel" value="true" />
        <!-- 允许使用自定义的主键值(比如由程序生成的UUID 32位编码作为键值),数据表的PK生成策略将被覆盖 -->
        <setting name="useGeneratedKeys" value="true" />
        <!-- 给予被嵌套的resultMap以字段-属性的映射支持 -->
        <setting name="autoMappingBehavior" value="FULL" />
        <!-- 对于批量更新操作缓存SQL以提高性能 -->
        <setting name="defaultExecutorType" value="BATCH" />
        <!-- 数据库超过25000秒仍未响应则超时 -->
        <setting name="defaultStatementTimeout" value="25000" />
    </settings>

    <!-- 全局别名设置,在映射文件中只需写别名,而不必写出整个类路径 -->
<!--    <typeAliases> -->
<!--    </typeAliases> -->
    <!-- 非注解的sql映射文件配置,如果使用mybatis注解,该mapper无需配置,但是如果mybatis注解中包含@resultMap注解,则mapper必须配置,给resultMap注解使用 -->
<!--    <mappers> -->
<!--    </mappers> -->
</configuration>

另一种方式SQL生成器(利用SqlBuilder生成)
需要import static org.apache.ibatis.jdbc.SqlBuilder.*;

@SelectProvider(type = UserSqlBuilder.class, method = "buildGetUsersByName")
List<User> getUsersByName(String name);

class UserSqlBuilder {
  public String buildGetUsersByName(final String name) {
    return new SQL(){{
      SELECT("*");
      FROM("users");
      if (name != null) {
        WHERE("name like #{value} || '%'");
      }
      ORDER_BY("id");
    }}.toString();
  }
}

xml方式

基础配置一样。
xml配置

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.asgard.bms.business.mapper.InfoUser">
<resultMap id="infoUser" type="com.asgard.bms.business.model.InfoUser" >
    <id column="ID" property="id" jdbcType="VARCHAR" />
    <result column="USERNAME" property="userName" jdbcType="VARCHAR" />
    <result column="PASSWORD" property="password" jdbcType="VARCHAR" />
    <result column="CREATE_USER" property="createUser" jdbcType="VARCHAR" />
    <result column="CREATE_DATE" property="createDate" jdbcType="VARCHAR" />
    <result column="IS_VALID" property="isValid" jdbcType="INTEGER" />
    <result column="LATEST_OP_DATE" property="latestOpDate" jdbcType="VARCHAR" />
    <result column="LATEST_OP_USER" property="latestOpUser" jdbcType="VARCHAR" />
</resultMap>


<select id="queryByUserName" parameterType="com.asgard.bms.business.model.InfoUser" resultMap="infoUser">
    SELECT * FROM INFO_USER IU 

    <where>
         IU.IS_VALID=1
        <if test="userName !=null and userName!=''">
            AND IU.USERNAME=#{userName,jdbcType=VARCHAR}
        </if>
        <if test="password !=null and password!=''">
            AND IU.PASSWORD=#{password,jdbcType=VARCHAR}
        </if>
    </where>
</select>


<insert id="insertUser" parameterType="com.asgard.bms.business.model.InfoUser">
    INSERT INTO INFO_USER(ID,USERNAME,PASSWORD,CREATE_USER,CREATE_DATE,IS_VALID) 
            VALUES(
                #{id,jdbcType=VARCHAR},
                #{userName,jdbcType=VARCHAR},
                #{password,jdbcType=VARCHAR},
                #{createUser,jdbcType=VARCHAR},
                #{createDate,jdbcType=VARCHAR},
                #{isValid,jdbcType=INTEGER}
            );
</insert>

<select id="queryUserCount" parameterType="com.asgard.bms.business.model.InfoUser" resultType="int">
    select count(1) from INFO_USER IU
     <where>
         IU.IS_VALID=1
        <if test="userName !=null and userName!=''">
            AND IU.USERNAME=#{userName,jdbcType=VARCHAR}
        </if>
        <if test="password !=null and password!=''">
            AND IU.PASSWORD=#{password,jdbcType=VARCHAR}
        </if>
    </where>
</select>

</mapper>
package com.asgard.bms.business.dao.impl;

import java.util.List;

import javax.annotation.Resource;

import org.apache.ibatis.session.RowBounds;
import org.springframework.stereotype.Repository;

import com.asgard.bms.business.base.BaseDAO;
import com.asgard.bms.business.dao.ILoginDao;
import com.asgard.bms.business.model.InfoUser;

@Repository
public class LoginDao implements ILoginDao{

    private static final String NAMESPACE_INFOUSER = "com.asgard.bms.business.mapper.InfoUser.";
    @Resource
    private BaseDAO baseDAO;

    @Override
    public InfoUser login(InfoUser user) throws Exception {
        // TODO Auto-generated method stub
        return this.baseDAO.selectOne(NAMESPACE_INFOUSER+"queryByUserName",user);
    }

    /***
     * 注册用户
     */
    @Override
    public int regirestUser(InfoUser infoUser) {
        // TODO Auto-generated method stub
        return baseDAO.insert(NAMESPACE_INFOUSER+"insertUser", infoUser);
    }
    /***
     * 测试分页功能
     */
    @Override
    public List<InfoUser> queryUserList(InfoUser infoUser, int pagenum,
            int pagesize) {
        RowBounds rowBounds = new RowBounds((pagenum-1)*pagesize, pagesize);
        return baseDAO.selectList(NAMESPACE_INFOUSER+"queryByUserName", infoUser,rowBounds);
    }

    @Override
    public int queryUserCount(InfoUser infoUser) {
        // TODO Auto-generated method stub
        return baseDAO.selectOne(NAMESPACE_INFOUSER+"queryUserCount", infoUser);
    }

}

资料:
官网
分页
注解
mybatis传多个参数
MyBatis使用注解处理List类型的参数
mybatis blob
MyBatis自定义数据映射TypeHandler

线程安全队列Queue

线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue

  • 单生产者,单消费者 用 LinkedBlockingqueue
  • 多生产者,单消费者 用 LinkedBlockingqueue
  • 单生产者 ,多消费者 用 ConcurrentLinkedQueue
  • 多生产者 ,多消费者 用 ConcurrentLinkedQueue

可能报异常 返回布尔值 可能阻塞 设定等待时间

  • 入队 add(e) offer(e) put(e) offer(e, timeout, unit)
  • 出队 remove() poll() take() poll(timeout, unit)
  • 查看 element() peek() 无 无
  • add(e) remove() element() 方法不会阻塞线程。当不满足约束条件时,会抛出IllegalStateException 异常。例如:当队列被元素填满后,再调用add(e),则会抛出异常。
  • offer(e) poll() peek() 方法即不会阻塞线程,也不会抛出异常。例如:当队列被元素填满后,再调用offer(e),则不会插入元素,函数返回false。
  • 要想要实现阻塞功能,需要调用put(e) take() 方法。当不满足约束条件时,会阻塞线程。

ConcurrentLinkedQueue内部有个匿名内部类Node
源码如下

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

LinkedBlockingQueue源码阅读

LinkedBlockingQueue实现了BlockingQueue、java.io.Serializable接口继承自AbstractQueue

构造函数:

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

没有入参,会开一个很大的队列231-1(整型最大数)

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

我们看到是一个链表,里面每个节点都是一个内部类Node

    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

put方法

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

这里的this.count、this.putLock为成员变量
AtomicInteger 一个提供原子操作的Integer的类,用来计算现在队列有多少个元素
private final AtomicInteger count = new AtomicInteger(0);

ReentrantLock可重入锁,take和put分别有锁。这里不分析
private final ReentrantLock putLock = new ReentrantLock();
private final ReentrantLock takeLock = new ReentrantLock();
使用put的时候,如果满了会挂起线程
private final Condition notFull = putLock.newCondition();
使用take的时候,如果为空会挂起线程
private final Condition notFull = putLock.newCondition();

当队列的数量达到最大的时候,调用notFull.await();线程等待释放CPU
接下来调用enqueue(node); last = last.next = node;先把last的下一个指向新节点,然后把last重新指向这个节点

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

技术器+1 c = count.getAndIncrement();

如果下一个进来的还不会达到队列最大数量就 notFull.signal();

如果队列原来空了,现在被放入一个元素,就会让正在等待的线程解锁,可以看到notEmpty是在take里面wait的

private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

在看下take方法

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

队列是先进先出的,所以将头指针直接指向第二个元素,h.next = h这个可以帮GC回收,如果原来的h指向的是现在的队列第一的元素first,这样一来GC不会认为他无用,自己指向自己。注意这里取item是第二个的item因为构造的时候第一个元素的item是null

    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

另外一种构造方法,将所有元素直接构造,使用一个循环,注意过程前后也是加锁的

    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

contains方法,很明显。从头开始一个个比较过去,有就返回true,这时候两把锁都加

    public boolean contains(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            fullyUnlock();
        }
    }
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

使用take()函数,如果队列中没有数据,则线程wait释放CPU,而poll()则不会等待,直接返回null;同样,空间耗尽时offer()函数不会等待,直接返回false,而put()则会wait,因此如果你使用while(true)来获得队列元素,千万别用poll(),CPU会100%的。

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

peek和take还有poll区别就是拿到队列第一个元素后吧第一个元素销毁

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

Dubbo与Zookeeper、SpringMVC整合和使用(负载均衡、容错)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
        http://www.springframework.org/schema/beans/spring-beans.xsd  
        http://code.alibabatech.com/schema/dubbo  
        dubbo.xsd  ">
    <dubbo:application  name="dubbo_provider"/>
    <dubbo:registry address="zookeeper://127.0.0.1:2181" check="false"  subscribe="false" register=""/> 
    <dubbo:monitor protocol="registry" />
    <!-- 要暴露的服务接口 -->    
 <dubbo:service interface="com.absurd.service.TestRegistryService" ref="testRegistryService" />  
</beans>  
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
        http://www.springframework.org/schema/beans/spring-beans.xsd  
        http://code.alibabatech.com/schema/dubbo  
        dubbo.xsd  ">
    <dubbo:application name="dubbo_consumer"/>
    <dubbo:registry address="zookeeper://serverHost:2181" check="false" register=""/> 
    <dubbo:reference id="testRegistryService" interface="com.absurd.service.TestRegistryService"  timeout="120000" />
</beans>  

详细文档:http://dubbo.io/Home-zh.htm

java8-更好用的时间处理

历史

在Java刚刚发布,也就是版本1.0的时候,对时间和日期仅有的支持就是java.util.Date类。大多数开发者对它的第一印象就是,它根本不代表一个“日期”。实际上,它只是简单的表示一个,从1970-01-01Z开始计时的,精确到毫秒的瞬时点。由于标准的toString()方法,按照JVM的默认时区输出时间和日期,有些开发人员把它误认为是时区敏感的。

在升级Java到1.1期间,Date类被认为是无法修复的。由于这个原因,java.util.Calendar类被添加了进来。悲剧的是,Calendar类并不比java.util.Date好多少。它们面临的部分问题是:

可变性。像时间和日期这样的类应该是不可变的。
偏移性。Date中的年份是从1900开始的,而月份都是从0开始的。
命名。Date不是“日期”,而Calendar也不真实“日历”。
格式化。格式化只对Date有用,Calendar则不行。另外,它也不是线程安全的。

package demo;

import java.util.Date;
import java.text.SimpleDateFormat;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Calendar;

public class Test
{
  public Test()
  {
  }

  public static void main(String[] args)
  {
    // 字符串转换日期格式
    // DateFormat fmtDateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    // 接收传入参数
    // String strDate = args[1];
    // 得到日期格式对象
    // Date date = fmtDateTime.parse(strDate);

    // 完整显示今天日期时间
    String str = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")).format(new Date());
    System.out.println(str);

    // 创建 Calendar 对象
    Calendar calendar = Calendar.getInstance();

    try
    {
      // 对 calendar 设置时间的方法
      // 设置传入的时间格式
      SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-M-d H:m:s");
      // 指定一个日期
      Date date = dateFormat.parse("2013-6-1 13:24:16");
      // 对 calendar 设置为 date 所定的日期
      calendar.setTime(date);

      // 按特定格式显示刚设置的时间
      str = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")).format(calendar.getTime());
      System.out.println(str);
    }
    catch (ParseException e)
    {
      e.printStackTrace();
    }

    // 或者另一種設置 calendar 方式
    // 分別爲 year, month, date, hourOfDay, minute, second
    calendar = Calendar.getInstance();
    calendar.set(2013, 1, 2, 17, 35, 44);
    str = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")).format(calendar.getTime());
    System.out.println(str);

    // Calendar 取得当前时间的方法
    // 初始化 (重置) Calendar 对象
    calendar = Calendar.getInstance();
    // 或者用 Date 来初始化 Calendar 对象
    calendar.setTime(new Date());

    // setTime 类似上面一行
    // Date date = new Date();
    // calendar.setTime(date);

    str = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")).format(calendar.getTime());
    System.out.println(str);

    // 显示年份
    int year = calendar.get(Calendar.YEAR);
    System.out.println("year is = " + String.valueOf(year));

    // 显示月份 (从0开始, 实际显示要加一)
    int month = calendar.get(Calendar.MONTH);
    System.out.println("nth is = " + (month + 1));

    // 本周几
    int week = calendar.get(Calendar.DAY_OF_WEEK);
    System.out.println("week is = " + week);

    // 今年的第 N 天
    int DAY_OF_YEAR = calendar.get(Calendar.DAY_OF_YEAR);
    System.out.println("DAY_OF_YEAR is = " + DAY_OF_YEAR);

    // 本月第 N 天
    int DAY_OF_MONTH = calendar.get(Calendar.DAY_OF_MONTH);
    System.out.println("DAY_OF_MONTH = " + String.valueOf(DAY_OF_MONTH));

    // 3小时以后
    calendar.add(Calendar.HOUR_OF_DAY, 3);
    int HOUR_OF_DAY = calendar.get(Calendar.HOUR_OF_DAY);
    System.out.println("HOUR_OF_DAY + 3 = " + HOUR_OF_DAY);

    // 当前分钟数
    int MINUTE = calendar.get(Calendar.MINUTE);
    System.out.println("MINUTE = " + MINUTE);

    // 15 分钟以后
    calendar.add(Calendar.MINUTE, 15);
    MINUTE = calendar.get(Calendar.MINUTE);
    System.out.println("MINUTE + 15 = " + MINUTE);

    // 30分钟前
    calendar.add(Calendar.MINUTE, -30);
    MINUTE = calendar.get(Calendar.MINUTE);
    System.out.println("MINUTE - 30 = " + MINUTE);

    // 格式化显示
    str = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS")).format(calendar.getTime());
    System.out.println(str);

    // 重置 Calendar 显示当前时间
    calendar.setTime(new Date());
    str = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS")).format(calendar.getTime());
    System.out.println(str);

    // 创建一个 Calendar 用于比较时间
    Calendar calendarNew = Calendar.getInstance();

    // 设定为 5 小时以前,后者大,显示 -1
    calendarNew.add(Calendar.HOUR, -5);
    System.out.println("时间比较:" + calendarNew.compareTo(calendar));

    // 设定7小时以后,前者大,显示 1
    calendarNew.add(Calendar.HOUR, +7);
    System.out.println("时间比较:" + calendarNew.compareTo(calendar));

    // 退回 2 小时,时间相同,显示 0
    calendarNew.add(Calendar.HOUR, -2);
    System.out.println("时间比较:" + calendarNew.compareTo(calendar));
  }
}

新的API:

java.time,由5个包组成:
java.time – 包含值对象的基础包
java.time.chrono – 提供对不同的日历系统的访问
java.time.format – 格式化和解析时间和日期
java.time.temporal – 包括底层框架和扩展特性
java.time.zone – 包含时区支持的类
大多数开发者只会用到基础和format包,也可能会用到temporal包。因此,尽管有68个新的公开类型,大多数开发者,大概,将只会用到其中的三分之一。

日期

在新的API中,LocalDate是其中最重要的类之一。它是表示日期的不可变类型,不包含时间和时区。

“本地”,这个术语,我们对它的熟悉来自于Joda-Time。它原本出自ISO-8061的时间和日期标准,它和时区无关。实际上,本地日期只是日期的描述,例如“2014年4月5日”。特定的本地时间,因你在地球上的不同位置,开始于不同的时间线。所以,澳大利亚的本地时间开始的比伦敦早10小时,比旧金山早18小时。

// 取当前日期:
LocalDate today = LocalDate.now(); // -> 2014-12-24
// 根据年月日取日期,12月就是12:
LocalDate crischristmas = LocalDate.of(2014, 12, 25); // -> 2014-12-25
// 根据字符串取:
LocalDate endOfFeb = LocalDate.parse("2014-02-28"); // 严格按照ISO yyyy-MM-dd验证,02写成2都不行,当然也有一个重载方法允许自己定义格式
LocalDate.parse("2014-02-29"); // 无效日期无法通过:DateTimeParseException: Invalid date
// 取本月第1天:
LocalDate firstDayOfThisMonth = today.with(TemporalAdjusters.firstDayOfMonth()); // 2014-12-01
// 取本月第2天:
LocalDate secondDayOfThisMonth = today.withDayOfMonth(2); // 2014-12-02
// 取本月最后一天,再也不用计算是28,29,30还是31:
LocalDate lastDayOfThisMonth = today.with(TemporalAdjusters.lastDayOfMonth()); // 2014-12-31
// 取下一天:
LocalDate firstDayOf2015 = lastDayOfThisMonth.plusDays(1); // 变成了2015-01-01
// 取2015年1月第一个周一,这个计算用Calendar要死掉很多脑细胞:
LocalDate firstMondayOf2015 = LocalDate.parse("2015-01-01").with(TemporalAdjusters.firstInMonth(DayOfWeek.MONDAY)); // 2015-01-05
LocalDate date = LocalDate.of(2014, Month.JUNE, 10);
int year = date.getYear(); // 2014
Month month = date.getMonth(); // 6月
int dom = date.getDayOfMonth(); // 10
DayOfWeek dow = date.getDayOfWeek(); // 星期二
int len = date.lengthOfMonth(); // 30 (6月份的天数)
boolean leap = date.isLeapYear(); // false (不是闰年)

LocalDate date = LocalDate.of(2014, Month.JUNE, 10);
date = date.withYear(2015); // 2015-06-10
date = date.plusMonths(2); // 2015-08-10
date = date.minusDays(1); // 2015-08-09

LocalDateTime的其他方法跟LocalDate和LocalTime相似。

这种相似的方法模式非常有利于API的学习。下面总结了用到的方法前缀:

of: 静态工厂方法,从组成部分中创建实例
from: 静态工厂方法,尝试从相似对象中提取实例。from()方法没有of()方法类型安全
now: 静态工厂方法,用当前时间创建实例
parse: 静态工厂方法,总字符串解析得到对象实例
get: 获取时间日期对象的部分状态
is: 检查关于时间日期对象的描述是否正确
with: 返回一个部分状态改变了的时间日期对象拷贝
plus: 返回一个时间增加了的、时间日期对象拷贝
minus: 返回一个时间减少了的、时间日期对象拷贝
to: 把当前时间日期对象转换成另外一个,可能会损失部分状态
at: 用当前时间日期对象组合另外一个,创建一个更大或更复杂的时间日期对象
format: 提供格式化时间日期对象的能力

LocalTime now = LocalTime.now(); // 11:09:09.240
LocalTime now = LocalTime.now().withNano(0)); // 11:09:09
LocalTime zero = LocalTime.of(0, 0, 0); // 00:00:00
LocalTime mid = LocalTime.parse("12:00:00"); // 12:00:00

JDBC:

SQL -> Java
--------------------------
date -> LocalDate
time -> LocalTime
timestamp -> LocalDateTime

TemporalAdjuster

import static java.time.DayOfWeek.*
import static java.time.temporal.TemporalAdjusters.*

LocalDate date = LocalDate.of(2014, Month.JUNE, 10);
date = date.with(lastDayOfMonth());
date = date.with(nextOrSame(WEDNESDAY));

时间点

在处理时间和日期的时候,我们通常会想到年,月,日,时,分,秒。然而,这只是时间的一个模型,是面向人类的。第二种通用模型是面向机器的,或者说是连续的。在此模型中,时间线中的一个点表示为一个很大的数。这有利于计算机处理。在UNIX中,这个数从1970年开始,以秒为的单位;同样的,在Java中,也是从1970年开始,但以毫秒为单位。

java.time包通过值类型Instant提供机器视图。Instant表示时间线上的一点,而不需要任何上下文信息,例如,时区。概念上讲,它只是简单的表示自1970年1月1日0时0分0秒(UTC)开始的秒数。因为java.time包是基于纳秒计算的,所以Instant的精度可以达到纳秒级。

Instant start = Instant.now();
// perform some calculation
Instant end = Instant.now();
assert end.isAfter(start);

Instant典型的用法是,当你需要记录事件的发生时间,而不需要记录任何有关时区信息时,存储和比较时间戳。它额很多有趣的地方在于,你不能对它做什么,而不是你能做什么。
instant.get(ChronoField.MONTH_OF_YEAR);
instant.plus(6, ChronoUnit.YEARS);

时区

ZoneId zone = ZoneId.of("Europe/Paris");

LocalDate date = LocalDate.of(2014, Month.JUNE, 10);
ZonedDateTime zdt1 = date.atStartOfDay(zone);

Instant instant = Instant.now();
ZonedDateTime zdt2 = instant.atZone(zone);

最恼人的时区问题之一就是夏令时。在夏令时中,从格林威治的偏移每年要调整两次(也许更多);典型的做法是,春天调快时间,秋天再调回来。夏令时开始时,我们都需要手动调整家里的挂钟时间。这些调整,在java.time包中,叫偏移过渡。春天时,跟本地时间相比,缺了一段时间;相反,秋天时,有的时间会出现两次。

ZonedDateTime在它的工厂方法和控制方法中处理了这些。例如,在夏令时切换的那天,增加一天会增加逻辑上的一天:可能多于24小时,也可能少于24小时。同样的,方法atStartOfDay()之所以这样命名,是因为你不能假定它的处理结果就一定是午夜零点,夏令时开始的那天,一天是从午夜1点开始的。

下面是关于夏令时的最后一个小提示。如果你想证明,在夏令时结束那天的重叠时段,你有考虑过什么情况会发生,你可以用这两个专门处理重叠时段的方法之一:

zdt = zdt.withEarlierOffsetAtOverlap();
zdt = zdt.withLaterOffsetAtOverlap();

时间长度

Period sixMonths = Period.ofMonths(6);
LocalDate date = LocalDate.now();
LocalDate future = date.plus(sixMonths);

解析和格式化

java.time.format包是专门用来格式化输出时间/日期的。这个包围绕DateTimeFormatter类和它的辅助创建类DateTimeFormatterBuilder展开。

静态方法加上DateTimeFormatter中的常量,是最通用的创建格式化器的方式。包括:

常用ISO格式常量,如ISO_LOCAL_DATE
字母模式,如ofPattern(“dd/MM/uuuu”)
本地化样式,如ofLocalizedDate(FormatStyle.MEDIUM)

DateTimeFormatter f = DateTimeFormatter.ofPattern("dd/MM/uuuu");
LocalDate date = LocalDate.parse("24/06/2014", f);
String str = date.format(f);

参考:http://www.importnew.com/14857.html
http://www.liaoxuefeng.com/article/00141939241051502ada88137694b62bfe844cd79e12c32000

spring实现多数据源同时访问(跨数据库查询)

假如配置了两个数据源dataSourceOne、dataSourceTwo

  <bean id="dynamicDataSource" class="com.core.DynamicDataSource">  
        <property name="targetDataSources">  
            <map key-type="java.lang.String">  
                <entry value-ref="dataSourceOne" key="dataSourceOne"></entry>  
                <entry value-ref="dataSourceTwo" key="dataSourceTwo"></entry>  
            </map>  
        </property>  
        <property name="defaultTargetDataSource" ref="dataSourceOne">  
        </property>  
    </bean> 

DynamicDataSource.class

package com.core;  

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;  

public class DynamicDataSource extends AbstractRoutingDataSource{  

    @Override  
    protected Object determineCurrentLookupKey() {  
        return DatabaseContextHolder.getCustomerType();   
    }  

}  

DatabaseContextHolder.class

package com.core;  

public class DatabaseContextHolder {  

    private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();  

    public static void setCustomerType(String customerType) {  
        contextHolder.set(customerType);  
    }  

    public static String getCustomerType() {  
        return contextHolder.get();  
    }  

    public static void clearCustomerType() {  
        contextHolder.remove();  
    }  
}  

DataSourceInterceptor.class

package com.core;  

import org.aspectj.lang.JoinPoint;  
import org.springframework.stereotype.Component;  

@Component  
public class DataSourceInterceptor {  

    public void setdataSourceOne(JoinPoint jp) {  
        DatabaseContextHolder.setCustomerType("dataSourceOne");  
    }  

    public void setdataSourceTwo(JoinPoint jp) {  
        DatabaseContextHolder.setCustomerType("dataSourceTwo");  
    }  
}  

aop配置

    <aop:config>  
        <aop:aspect id="dataSourceAspect" ref="dataSourceInterceptor">  
            <aop:pointcut id="daoOne" expression="execution(* com.dao.one.*.*(..))" />  
            <aop:pointcut id="daoTwo" expression="execution(* com.dao.two.*.*(..))" />  
            <aop:before pointcut-ref="daoOne" method="setdataSourceOne" />  
            <aop:before pointcut-ref="daoTwo" method="setdataSourceTwo" />  
        </aop:aspect>  

RabbitMq整合spring简单样例

生产者
spring 配置如下:

<rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" publisher-confirms="true" virtual-host="/" username="absurd" password="absurd" />
<!--下面只有当声明了exchange和队列才可以使用->
<!-- <rabbit:queue id="queue" durable="true" auto-delete="false" exclusive="false" name="queue"/>
<rabbit:queue id="queue2" durable="true" auto-delete="false" exclusive="false" name="queue2"/>
        将队列绑定到交换路由同时与key绑定
    <rabbit:fanout-exchange name="absurd_exchange" durable="true" auto-delete="false" id="absurd_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue" />
            <rabbit:binding queue="queue2" />
        </rabbit:bindings>
    </rabbit:fanout-exchange> 
 <rabbit:template id="rabbitTemplate" exchange="absurd_exchange" connection-factory="connectionFactory"/>  -->
<rabbit:template id="rabbitTemplate"  connection-factory="connectionFactory"/> 

service

@Service
public class ProducerServiceImpl implements ProducerService{

    @Autowired private RabbitTemplate rabbitTemplate;
    public void sendMessage(String msg, String routingKey,String exchange) {
        System.err.println("err"+msg+routingKey+exchange);
        RabbitAdmin admin = new RabbitAdmin(this.rabbitTemplate.getConnectionFactory());
        admin.declareExchange(new FanoutExchange(exchange,true,false));  
        admin.declareQueue(new Queue(routingKey,true,false,false) );
        admin.declareBinding(new Binding(routingKey, DestinationType.QUEUE, exchange, routingKey, null));//如果声明了队列、exchange、绑定后就无需使用RabbitAdmin 
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
        rabbitTemplate.convertAndSend(routingKey,msg);
    }

}

controller

    @RequestMapping(value="/publish/{msg}/{queue}/{exchange}",method=RequestMethod.GET)
    public ModelAndView publish(@PathVariable(value="msg")String msg,@PathVariable(value="queue")String queue,@PathVariable(value="exchange")String exchange){
        ModelAndView mv = new ModelAndView();
        producerService.sendMessage(msg, queue,exchange);
        System.out.println(msg);
        mv.setViewName("a");
        mv.addObject("msg", msg);
        return mv;

    }

消费者
spring配置

    <!-- 连接工厂 -->
    <rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="absurd" password="absurd" />
    <!-- 监听器 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
        <rabbit:listener queues="queue" method="onMessage" ref="redQueueListener" />
    </rabbit:listener-container>
    <!-- 队列声明 -->
    <rabbit:queue name="queue" durable="true" />

       <!-- 配置线程池 -->  
<bean id ="taskExecutor"  class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >  
    <!-- 线程池维护线程的最少数量 -->  
<property name ="corePoolSize" value ="5" />  
    <!-- 线程池维护线程所允许的空闲时间 -->  
<property name ="keepAliveSeconds" value ="30000" />  
    <!-- 线程池维护线程的最大数量 -->  
<property name ="maxPoolSize" value ="1000" />  
    <!-- 线程池所使用的缓冲队列 -->  
<property name ="queueCapacity" value ="200" />  
    </bean>  
    <!-- 红色监听处理器 -->
    <bean id="redQueueListener" class="com.absurd.rabbitmqcustomer.RedQueueListener"  />

监听方法

public class RedQueueListener {
    private static Logger log = LoggerFactory.getLogger(RedQueueListener.class);
    /**
     * 处理消息
     * @param message
     * void
     */
    public void onMessage(String message) {
        log.info("RedQueueListener Receved:"  + message);
    }
}

rabbitmq引入:

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.5.6.RELEASE</version>
        </dependency>

效果:
访问http://localhost:8080/rabbitmqproducer/hello/publish/bfdbdfbdfg/queue/absurd_exchange5
消费者就能监听到消息

mysql 异常集锦&小技巧

错误:单独运行正常,使用union all后某字段内容被截断

现象:
单独运行时:
balalala|#ss##184||balalala|#ccv##185||balalala@balalala|#gfvb##186||www@balalala|#dsfdsf什么鬼啊##187||www@www|#cdfsf东方飒##188||www@www|#成的非师范##189||www@www|#fcdsf的非师范##190||www@www|#第三方##191||www@www|#dsfs东方飒菜单树##192||157***_6623|#fdsf东方飒##193||www@157**_6623|#fdsf##194||157***6623@www|#东方飒东方飒##195

union all 后
balalala|#ss##184||balalala|#ccv##185||balalala@balalala|#gfvb##186||www@balalala|#dsfdsf什么鬼啊##187||www@www|#cdfsf东方飒##188||www@www|#成的非师范##189||www@www|#fcdsf的非师范##190||www@www|#第三方##191||www@www|#dsfs东方飒菜单树##192||157***6623|#fdsf东方飒##193||www@157***662

解决方法:
对字段采用
CONVERT(字段,BINARY) AS replystr

原因:
该字段是查询拼接的,会被自动封装成字符串类型且长度比较小

技巧:对手机号加星

INSERT (phoneno, 4, 4, '****')

ThreadLocal来代替synchronized更好地实现线程安全问题

package com.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class ConnectionManager {

    private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
        @Override
        protected Connection initialValue() {
            Connection conn = null;
            try {
                conn = DriverManager.getConnection(
                        "jdbc:mysql://localhost:3306/test", "username",
                        "password");
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return conn;
        }
    };

    public static Connection getConnection() {
        return connectionHolder.get();
    }

    public static void setConnection(Connection conn) {
        connectionHolder.set(conn);
    }
}

mysql查询优化技巧

http://blog.jobbole.com/24006/

索引区分度

区分度: 指字段在数据库中的不重复比
区分度在新建索引时有着非常重要的参考价值,在MySQL中,区分度的计算规则如下:

字段去重后的总数与全表总记录数的商。

例如:
select count(distinct(name))/count(*) from t_base_user;

count(distinct(name))/count(*)

1.0000

其中区分度最大值为1.000,最小为0.0000,区分度的值越大,也就是数据不重复率越大,新建索引效果也越好,在主键以及唯一键上面的区分度是最高的,为1.0000,在状态,性别等字段上面的区分度值是最小的。 (这个就要看数据量了,如果只有几条数据,这时区分度还挺高的,如果数据量多,区分度基本为0.0000。也就是在这些字段上添加索引后,效果也不佳的原因。)

值得注意的是: 如果表中没有任何记录时,计算区分度的结果是为空值,其他情况下,区分度值均分布在0.0000-1.0000之间。

个人强烈建议, 建索引时,一定要先计算该字段的区分度,原因如下:

1、单列索引

可以查看该字段的区分度,根据区分度的大小,也能大概知道在该字段上的新建索引是否有效,以及效果如何。区分度越大,索引效果越明显。

2、多列索引(联合索引)

多列索引中其实还有一个字段的先后顺序问题,一般是将区分度较高的放在前面,这样联合索引才更有效,例如:

select * from t_base_user where name="" and status=1;

像上述语句,如果建联合索引的话,就应该是:

alter table t_base_user add index idx_name_status(name,status);

而不是:

alter table t_base_user add index idx_status_name(status,name);

最左前缀匹配原则

MySQL会一直向右匹配直到遇到范围查询(>、<、between、like)就停止匹配,比如
select * from t_base_user where type="10" and created_at<"2017-11-03" and status=1, (该语句仅作为演示)

在上述语句中,status就不会走索引,因为遇到<时,MySQL已经停止匹配,此时走的索引为:(type,created_at),其先后顺序是可以调整的,而走不到status索引,此时需要修改语句为:

select * from t_base_user where type=10 and status=1 and created_at<"2017-11-03"

举例:
CREATE TABLE titles (
id varchar(50) NOT NULL DEFAULT '',
emp_no varchar(50) NOT NULL DEFAULT '',
title varchar(50) NOT NULL DEFAULT '',
from_date datetime DEFAULT NULL COMMENT 'from',
to_date datetime DEFAULT NULL,
date_create datetime DEFAULT NULL,
date_update datetime DEFAULT NULL,
date_delete datetime DEFAULT NULL,
PRIMARY KEY (id),
KEY idx_emp (emp_no,title,from_date) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

一:全列匹配
EXPLAIN SELECT * FROM titles WHERE emp_no='10001' AND title='Senior Engineer' AND from_date='1986-06-26';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ref idx_emp idx_emp 310 const,const,const 1

次序调换也是一样
EXPLAIN SELECT * FROM titles WHERE from_date='1986-06-26' AND emp_no='10001' AND title='Senior Engineer';

二:最左前缀匹配

EXPLAIN SELECT * FROM titles WHERE emp_no='10001';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ref idx_emp idx_emp 152 const 1

三:查询条件用到了索引中列的精确匹配,但是中间某个条件未提供
EXPLAIN SELECT * FROM employees.titles WHERE emp_no='10001' AND from_date='1986-06-26';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ref idx_emp idx_emp 152 const 1 Using index condition
EXPLAIN SELECT * FROM titles
WHERE emp_no='10001'
AND title IN ('Senior Engineer', 'Staff', 'Engineer', 'Senior Staff', 'Assistant Engineer', 'Technique Leader', 'Manager')
AND from_date='1986-06-26';
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ALL idx_emp NULL 7 Using where

四:查询条件没有指定索引第一列

EXPLAIN SELECT * FROM titles WHERE from_date='1986-06-26';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles ALL NULL NULL NULL NULL 443308 Using where

五:匹配某列的前缀字符串
EXPLAIN SELECT * FROM employees.titles WHERE emp_no='10001' AND title LIKE 'Senior%';

id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE titles range PRIMARY PRIMARY 56 NULL 1 Using where

避免全表扫描

  1. 应尽量避免在 where 子句中使用!=或<>操作符,否则将引擎放弃使用索引而进行全表扫描
  2. 对查询进行优化,应尽量避免全表扫描,首先应考虑在 where 及 order by 涉及的列上建立索引。
  3. 应尽量避免在 where 子句中对字段进行 null 值判断,否则将导致引擎放弃使用索引而进行全表扫描
    如:
select id from t where num is null

可以在num上设置默认值0,确保表中num列没有null值,然后这样查询:

select id from t where num=0
  1. 尽量避免在 where 子句中使用 or 来连接条件,否则将导致引擎放弃使用索引而进行全表扫描,如:
 select id from t where num=10 or num=20

可以这样查询:

     select id from t where num=10
     union all
     select id from t where num=20

5.不能前置百分号

  select id from t where name like ‘hkjh%’

若要提高效率,可以考虑全文检索。

  1. in 和 not in 也要慎用,如:
 select id from t where num in(1,2,3)

对于连续的数值,能用 between 就不要用 in 了:

 select id from t where num between 1 and 3

7.如果在 where 子句中使用参数,也会导致全表扫描。因为SQL只有在运行时才会解析局部变量,但优化程序不能将访问计划的选择推迟到运行时;它必须在编译时进行选择。然 而,如果在编译时建立访问计划,变量的值还是未知的,因而无法作为索引选择的输入项。如下面语句将进行全表扫描:

     select id from t where num=@num

可以改为强制查询使用索引:

select id from t with(index(索引名)) where num=@num

8.应尽量避免在where子句中对字段进行函数操作,这将导致引擎放弃使用索引而进行全表扫描。如:

     select id from t where substring(name,1,3)=’abc’–name以abc开头的id
     select id from t where datediff(day,createdate,’2005-11-30′)=0–’2005-11-30′生成的id

应改为:

     select id from t where name like ‘abc%’
     select id from t where createdate>=’2005-11-30′ and createdate<’2005-12-1′

用 exists 代替 in

很多时候用 exists 代替 in 是一个好的选择:

     select num from a where num in(select num from b)

用下面的语句替换:

   select num from a where exists(select 1 from b where num=a.num)

使用数字型字段

1.尽量使用数字型字段,若只含数值信息的字段尽量不要设计为字符型,这会降低查询和连接的性能,并会增加存储开销。这是因为引擎在处理查询和连接时会 逐个比较字符串中每一个字符,而对于数字型而言只需要比较一次就够了。

2.尽可能的使用 varchar/nvarchar 代替 char/nchar ,因为首先变长字段存储空间小,可以节省存储空间,其次对于查询来说,在一个相对较小的字段内搜索效率显然要高些。

临时表

  • 尽量使用表变量来代替临时表。如果表变量包含大量数据,请注意索引非常有限(只有主键索引)。
  • 在新建临时表时,如果一次性插入数据量很大,那么可以使用 select into 代替 create table,避免造成大量 log ,以提高速度;如果数据量不大,为了缓和系统表的资源,应先create table,然后insert。
  • 如果使用到了临时表,在存储过程的最后务必将所有的临时表显式删除,先 truncate table ,然后 drop table ,这样可以避免系统表的较长时间锁定。

重构查询的方式

  1. 将一个复杂查询拆分为数个小且简单的查询,数据返回也快。
  2. 切分查询,如删除10万条数据,可以切分为10次,每次删除1万条。
  3. 分解关联查询:
SELECT * FROM tag
    JOIN tag_post ON tag_post.tag_id = tag.id
    JOIN post ON tag_post.post_id = post.id
WHERE tag.name = 'mysql';

分解为

SELECT * FROM tag WHERE name = 'mysql';
SELECT * FROM tag_post WHERE tag_id = 1234;
SELECT * FROM post WHERE post.id in (123,456,789,818);

4.当只要一行数据时使用 LIMIT 1

一个实例,如何对单表查询优化:

select * from product limit 866613,20 37.44秒

select id from product limit 866613,20 0.2秒(主键索引)

SELECT * FROM product WHERE ID >= (select id from product limit 866613,1) limit 20
0.2秒

慢查询基础

优化数据访问,就是优化访问的数据,操作对象是要访问的数据,两方面,是否向服务器请求了大量不需要的数据,二是是否逼迫MySQL扫描额外的记录(没有必要扫描)。

请求不需要数据的典型案例:不加LIMIT(返回全部数据,只取10条)、多表关联Select * 返回全部列(多表关联查询时*返回多个表的全部列)、还是Select *(可能写程序方面或代码复用方面有好处,但还要权衡)、重复查询相同数据(真需要这样,可以缓存下来,移动开发这个很有必要本地存储)。

标志额外扫描的三个指标:响应时间(自己判断是否合理值)、扫描的行数、返回的行数,一般扫描行数>返回行数。

扫描的行数需要与一个“访问类型”概念关联,就是 Explain 中的 type,explain的type结果由差到优分别是:ALL(全表扫描)、index(索引扫描)、range(范围扫描)、ref(唯一索引查询 key_col=xx)、const(常数引用)等。从“访问类型”可以明白,索引让 MySQL 以最高效、扫描行数最少的方式找到需要的记录。

书中有个例子,说明在where中使用已是索引的列和取消该列的索引后两种结果,type由ref变为All,预估要访问的rows从10变为5073,差异非常明显。

MySQL查询优化器的局限性

一个UNION限制,无法将限制条件从外层下推到内层,改造例子如下

(SELECT first_name,last_name
FROM sak.actor
ORDER BY last_name)
UNION ALL
(SELECT first_name,last_name
FROM sak.customer
ORDER BY last_name)
LIMIT 20;

优化后

(SELECT first_name,last_name
FROM sak.actor
ORDER BY last_name
LIMIT 20)
UNION ALL
(SELECT first_name,last_name
FROM sak.customer
ORDER BY last_name
LIMIT 20)
LIMIT 20;

等值传递:讲的IN列表,MySQL会将IN列表的值传到各个过滤子句,如果IN列表太大,会造成额外消耗,优化和执行都很慢。

最大值和最小值,MySQL对 MIN()和MAX()做得不好

SELECT MIN(actor_id) FROM sak.actor WHERE first_name = 'EE';

改造后(first_name 不是索引,原来必须全表查询)

SELECT actor_id FROM sak.actor USE INDEX(PRIMARY)
WHERE first_name = 'EE' LIMIT 1;

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.