`

Mina 断线重连

    博客分类:
  • MINA
阅读更多

Mina 断线重连

    定义:这里讨论的Mina 断线重连是指使用mina作为客户端软件,连接其他提供Socket通讯服务的服务器端。Socket服务器可以是Mina提供的服务器,也可以是C++提供的服务器。

 



   
 
 

 

一、断线重连的方式;

    1. 在创建Mina客户端时增加一个监听器,或者增加一个拦截器,当检测到Session关闭时,自动进行重连。

 
   
 

 

    2. 在第1种方式的基础上,增加客户端的读写通道空闲检查,当发生Session关闭或者读写空闲时,进行重连。

 

 
   
 

 

        第一种方式比较传统,优点是简单方便,适合网络稳定、数据量不大(1M带宽以下)的环境;不过缺点是不能对系统级的连接断开阻塞进行捕获。

       

        第二种方式更加精细,基本上能捕获到应用、网络、系统级的断连。

 

二、重连目的:

        在使用Mina做为客户端时,往往因为网络、服务器、应用程序出现问题而导致连接断开,而自动重连,就是解决连接断开的唯一方式。如果网线断开、服务器宕机、应用程序挂了,都是断线的原因,这个时候,通过增加一个监听器或者拦截器,就能实现重连。但是生产环境中,断线的原因可能更复杂:网络不稳定、延时、服务器负载高、服务器或者应用程序的发送或者接收缓冲区满等等问题都可能导致数据传输过程出现类似于断线的情况,这个时候,光检测Session关闭是远远不够的,这个时候就需要一种重连机制,比如读写空闲超过30秒,就进行重连。对于数据不间断、实时性高、数据量大的应用场景,更是实用。

 

 

三、实例:

    第一种:监听器方式

       创建一个监听器实现mina的IoServiceListener接口,里面的方法可以不用写实现

import org.apache.mina.core.service.IoService;
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class IoListener implements IoServiceListener{
	@Override
	public void serviceActivated(IoService arg0) throws Exception {
		// TODO Auto-generated method stub
	}
	@Override
	public void serviceDeactivated(IoService arg0) throws Exception {
		// TODO Auto-generated method stub
	}
	@Override
	public void serviceIdle(IoService arg0, IdleStatus arg1) throws Exception {
		// TODO Auto-generated method stub
	}
	@Override
	public void sessionCreated(IoSession arg0) throws Exception {
		// TODO Auto-generated method stub
	}

	@Override
	public void sessionDestroyed(IoSession arg0) throws Exception {
		// TODO Auto-generated method stub
	}
}

    

 再创建客户端时加入监听

		NioSocketConnector connector = new NioSocketConnector();  //创建连接客户端
		connector.setConnectTimeoutMillis(30000); //设置连接超时
		connector.getSessionConfig().setReceiveBufferSize(10240);	// 设置接收缓冲区的大小
		connector.getSessionConfig().setSendBufferSize(10240);// 设置输出缓冲区的大小
//		加入解码器
		TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName("GBK"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());
		factory.setDecoderMaxLineLength(10240);
		factory.setEncoderMaxLineLength(10240);
		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));
		connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// 设置默认访问地址
		//添加处理器
                connector.setHandler(new IoHandler()); 
		
                // 添加重连监听
		connector.addListener(new IoListener() {
			@Override
			public void sessionDestroyed(IoSession arg0) throws Exception {
				for (;;) {
					try {
						Thread.sleep(3000);
						ConnectFuture future = connector.connect();
						future.awaitUninterruptibly();// 等待连接创建成功
						session = future.getSession();// 获取会话
						if (session.isConnected()) {
							logger.info("断线重连[" + connector.getDefaultRemoteAddress().getHostName() + ":" + connector.getDefaultRemoteAddress().getPort() + "]成功");
							break;
						}
					} catch (Exception ex) {
						logger.info("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());
					}
				}
			}
		});
                for (;;) {
			try {
				ConnectFuture future = connector.connect();
				future.awaitUninterruptibly(); // 等待连接创建成功  
		                session = future.getSession(); // 获取会话   
				logger.info("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
				break;
			} catch (RuntimeIoException e) {
				logger.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);
				Thread.sleep(5000);// 连接失败后,重连间隔5s
			}
		}

 

 

    第一种:拦截器方式

 

		connector = new NioSocketConnector();  //创建连接客户端
		connector.setConnectTimeoutMillis(30000); //设置连接超时
//		断线重连回调拦截器
		connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {
			@Override
			public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {
				for(;;){
					try{
						Thread.sleep(3000);
						ConnectFuture future = connector.connect();
						future.awaitUninterruptibly();// 等待连接创建成功
						session = future.getSession();// 获取会话
						if(session.isConnected()){
							logger.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
							break;
						}
					}catch(Exception ex){
						logger.info("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());
					}
				}
			}
		});
		
		TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName(encoding), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());
		factory.setDecoderMaxLineLength(10240);
		factory.setEncoderMaxLineLength(10240);
		//加入解码器
		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));
		//添加处理器
                connector.setHandler(new IoHandler());
		connector.getSessionConfig().setReceiveBufferSize(10240);	// 设置接收缓冲区的大小
		connector.getSessionConfig().setSendBufferSize(10240);          // 设置输出缓冲区的大小
		connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// 设置默认访问地址
		for (;;) {
			try {
				ConnectFuture future = connector.connect();
				// 等待连接创建成功
				future.awaitUninterruptibly();
				// 获取会话
				session = future.getSession();
				logger.error("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
				break;
			} catch (RuntimeIoException e) {
				logger.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);
				Thread.sleep(5000);// 连接失败后,重连间隔5s
			}
		}

 

 

 

第二种:加入空闲检测机制

        空闲检测机制需要在创建客户端时,加入空闲超时,然后在处理器handler端的sessionIdle方法中加入一个预关闭连接的方法。让Session关闭传递到监听器或者拦截器的sessionClose方法中实现重连。

      以拦截器方式为例,在创建客户端时,加入读写通道空闲检查超时机制。

 

		connector = new NioSocketConnector();  //创建连接客户端
		connector.setConnectTimeoutMillis(30000); //设置连接超时
//		断线重连回调拦截器
		connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {
			@Override
			public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {
				for(;;){
					try{
						Thread.sleep(3000);
						ConnectFuture future = connector.connect();
						future.awaitUninterruptibly();// 等待连接创建成功
						session = future.getSession();// 获取会话
						if(session.isConnected()){
							logger.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
							break;
						}
					}catch(Exception ex){
						logger.info("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());
					}
				}
			}
		});
		
		connector.getFilterChain().addLast("mdc", new MdcInjectionFilter());
		TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName(encoding), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());
		factory.setDecoderMaxLineLength(10240);
		factory.setEncoderMaxLineLength(10240);
		//加入解码器
		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));

		connector.getSessionConfig().setReceiveBufferSize(10240);	// 设置接收缓冲区的大小
		connector.getSessionConfig().setSendBufferSize(10240);// 设置输出缓冲区的大小
		
		connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30000);  //读写都空闲时间:30秒
		connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40000);//读(接收通道)空闲时间:40秒
		connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50000);//写(发送通道)空闲时间:50秒
		
		//添加处理器
                connector.setHandler(new IoHandler()); 
		
		connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// 设置默认访问地址
		for (;;) {
			try {
				ConnectFuture future = connector.connect();
				// 等待连接创建成功
				future.awaitUninterruptibly();
				// 获取会话
				session = future.getSession();
				logger.error("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
				break;
			} catch (RuntimeIoException e) {
				System.out.println("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage());
				logger.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);
				Thread.sleep(5000);// 连接失败后,重连10次,间隔30s
			}
		}

 

 

      然后在数据处理器IoHandler中sessionIdle方法中加入Session会话关闭的代码,这样session关闭就能传递到拦截器或者监听器中,然后实现重连。

 

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class IoHandler extends IoHandlerAdapter {
	//部分代码忽略...
	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		logger.info("-客户端与服务端连接[空闲] - " + status.toString());
		if(session != null){
			session.close(true);
		}
	}
	//部分代码忽略...
}

 

 总结-最佳实践:

       以上两种方式我个人认为最好是使用第二种。在实际的生产环境,对于数据量比较少的情况下,需要加一个线程专门发送心跳信息,然后在服务器端进行回应心跳,这样就保证读写通道不出现空闲。如果数据量比较大,大到24小时都有数据,那么就不需要心跳线程,可以直接在IoHandler处理器端中messageReceived方法中定时发送心跳到服务器。由于读写监控还可以处理服务器、网络、应用等等方面的不确定因素,所以建议使用第二种方式。

 

另外有几点注意事项:

第一:断线重连是针对长连接的,也就是说,连接后两端一直在发送数据。

第二:断线重连是针对客户端的,如果你在服务端使用,可能会有根据场景导致失败。因为服务端会自动发送心跳。

 

第二:断线重连如果测试设置接收超时情况,就应该一直发送数据,而服务端只接收不发送,心跳数据也不发送,到时间后才会起作用。

 

 

  • 大小: 50.9 KB
  • 大小: 49.5 KB
  • 大小: 58.4 KB
分享到:
评论
16 楼 rogue2yjg 2018-12-04  
为什么不直接在IoHandler里的sessionClosed方法里加入重连代码,而是要再加一个Filter或者Listener?
15 楼 hgrapple 2018-03-09  
死锁异常DEAD LOCK: IoFuture.await() was invoked from an I/O processor thread. Please use IoFutureListener or configure a proper thread model alternatively.求解决办法!!
14 楼 q178266871 2017-08-10  
freedomszq 写道
重连是出现DEAD LOCK: IoFuture.await() was invoked from an I/O processor thread.  Please use IoFutureListener or configure a proper thread model alternatively.
要怎样解决呢


这段代码就可以实现断线重连了 
       while(true){
            try {
                ConnectFuture future = connector.connect();
                future.awaitUninterruptibly(); // 等待连接创建成功
                session = future.getSession(); // 获取会话
                if (session.isConnected()) {
                    Log.d("socket", "连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                }
                SystemClock.sleep(30000);
            } catch (Exception e) {
                Log.d("socket","连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + e.getMessage(), e);
              // TODO  textSpeaker.speak("socket connect error");
                Log.d("socket","重连服务器登录失败,25秒再连接一次:" + e.getMessage());

            }
        }

在   // 监听客户端是否断线  这段代码中不需要再次 写一遍 会死锁的 
        connector.addListener(new IoListener() {
            @Override
            public void sessionDestroyed(IoSession arg0) throws Exception {
                // TODO 监听客户端是否断线
                Log.d("socket", "sessionDestroyed 连接断开");
            }

            @Override
            public void sessionClosed(IoSession session) throws Exception {}
        });
13 楼 freedomszq 2017-04-20  
重连是出现DEAD LOCK: IoFuture.await() was invoked from an I/O processor thread.  Please use IoFutureListener or configure a proper thread model alternatively.
要怎样解决呢
12 楼 wellway 2016-11-10  
fenyun689 写道
知道哪里错了。谢谢。

我的也出现这个问题,请问你是怎么解决的?
11 楼 luhang0102 2016-05-23  
重连的时候future.awaitUninterruptibly()没响应怎么办啊,走到这儿就死了
10 楼 fenyun689 2016-04-26  
fenyun689 写道


拦截器没有生效,网络断了不会重连。


public void connect(){
		
		 //Create TCP/IP connection     
	    connector = new NioSocketConnector(1); 
        
        connector.getFilterChain().addFirst("reconnection",
				new IoFilterAdapter() {
					@Override
					public void sessionClosed(NextFilter nextFilter,
							IoSession ioSession) throws Exception {
						for (;;) {
							try {
								Thread.sleep(10*1000);
								ConnectFuture future = connector.connect();
								future.awaitUninterruptibly();// 等待连接创建成功
								IoSession session = future.getSession();// 获取会话
								if (session.isConnected()) {
									log.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
									break;
								}
							} catch (Exception ex) {
								    log.info("重连服务器登录失败,10秒再连接一次:" + ex.getMessage());
							}
						}
					}
				});
             
        //创建接受数据的过滤器     
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();     
             
        //设置报文组装和解析器 (重点)    
        chain.addLast("package_deal", new ProtocolCodecFilter(new ClientDataCodecFactory()));     
             
        //客户端的消息处理器(重点)     
        connector.setHandler(new ClientDataHandler(this.stakeNo));     
             
        //set connect timeout     
        connector.setConnectTimeoutMillis(3000);  
             
        //连接到服务器:     
        cf = connector.connect(new InetSocketAddress(this.serverIp,this.serverPort));     
             
        //Wait for the connection attempt to be finished.     
        cf.awaitUninterruptibly();     
             
        while(!cf.isConnected()){
        	
        	cf = connector.connect(new InetSocketAddress(this.serverIp,this.serverPort));
        	cf.awaitUninterruptibly();
        	
        	//每个10秒连接一次
        	try {
				Thread.sleep(10*1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				log.info("重连服务器登录失败,10秒再连接一次:" + e.getMessage());
				e.printStackTrace();
			}
        }
        
        //设置读缓存
        connector.getSessionConfig().setReadBufferSize(2*1024*1024);
      
	}

 


    

9 楼 fenyun689 2016-04-01  
知道哪里错了。谢谢。
8 楼 fenyun689 2016-04-01  
我用你提供的的 拦截器方式的代码。
重连时提示:“重连服务器登录失败,10秒再连接一次:defaultRemoteAddress is not set.”

因为变量 future 提示错误,我就在方法外定义的该变量。
你给看下哪一点有问题。
public void connect(){
		
		 //Create TCP/IP connection     
	    connector = new NioSocketConnector(1); 
        
        connector.getFilterChain().addFirst("reconnection",
				new IoFilterAdapter() {
					@Override
					public void sessionClosed(NextFilter nextFilter,
							IoSession ioSession) throws Exception {
						for (;;) {
							try {
								Thread.sleep(10*1000);
								ConnectFuture future = connector.connect();
								future.awaitUninterruptibly();// 等待连接创建成功
								IoSession session = future.getSession();// 获取会话
								if (session.isConnected()) {
									log.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
									break;
								}
							} catch (Exception ex) {
								    log.info("重连服务器登录失败,10秒再连接一次:" + ex.getMessage());
							}
						}
					}
				});
             
        //创建接受数据的过滤器     
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();     
             
        //设置报文组装和解析器 (重点)    
        chain.addLast("package_deal", new ProtocolCodecFilter(new ClientDataCodecFactory()));     
             
        //客户端的消息处理器(重点)     
        connector.setHandler(new ClientDataHandler(this.stakeNo));     
             
        //set connect timeout     
        connector.setConnectTimeoutMillis(3000);  
             
        //连接到服务器:     
        cf = connector.connect(new InetSocketAddress(this.serverIp,this.serverPort));     
             
        //Wait for the connection attempt to be finished.     
        cf.awaitUninterruptibly();     
             
        while(!cf.isConnected()){
        	
        	cf = connector.connect(new InetSocketAddress(this.serverIp,this.serverPort));
        	cf.awaitUninterruptibly();
        	
        	//每个10秒连接一次
        	try {
				Thread.sleep(10*1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				log.info("重连服务器登录失败,10秒再连接一次:" + e.getMessage());
				e.printStackTrace();
			}
        }
        
        //设置读缓存
        connector.getSessionConfig().setReadBufferSize(2*1024*1024);
      
	}

 


    
7 楼 chwshuang 2016-03-30  
fenyun689 写道
你用拦截器 用 sessionClosed 方式 会影响正常的关闭吧?


影响正常关闭?你说的是短连接吧!我这个方案是适合大数据量的长连接场景!
6 楼 fenyun689 2016-03-29  
你用拦截器 用 sessionClosed 方式 会影响正常的关闭吧?

5 楼 java029 2016-03-03  
受教了,谢谢分享
4 楼 chwshuang 2015-10-15  
u010457784 写道
请教个问题,现在的逻辑是在sessionIdle()中增加session.close可以出发到拦截器,然后实现重连。但是有一个问题,当程序退出的时候,也需要将这个session关闭,此时应该也会触发到了拦截器,这时候应该怎么办呢?
是不是需要在重连之前设置一个标志位判断呢?楼主有没有考虑到这个问题



你指的程序退出是客户端程序退出的话,那么关闭session就不会走sessionIdle ,会走sessionClose,这时会自动关闭session。除非你要在session关闭前做其他操作,可以在sessionClose中做处理。
3 楼 u010457784 2015-09-30  
请教个问题,现在的逻辑是在sessionIdle()中增加session.close可以出发到拦截器,然后实现重连。但是有一个问题,当程序退出的时候,也需要将这个session关闭,此时应该也会触发到了拦截器,这时候应该怎么办呢?
是不是需要在重连之前设置一个标志位判断呢?楼主有没有考虑到这个问题
2 楼 chwshuang 2014-11-13  
ikrboy 写道
connector.getSessionConfig().setIdleTime时间的单位应该是秒吧,并且通过检测空闲时间来重连可取吗,空闲状态不是表明连接尚未断开吗?


是秒。空闲也表示连接尚未断开。
但是,Mina中由于缓存池满,系统连接异常、网卡异常等问题,这个状态是有作用的。这里是加的双重报销,也是在生产环境中遇到问题后找出的解决方案,仅供参考。
1 楼 ikrboy 2014-09-03  
connector.getSessionConfig().setIdleTime时间的单位应该是秒吧,并且通过检测空闲时间来重连可取吗,空闲状态不是表明连接尚未断开吗?

相关推荐

Global site tag (gtag.js) - Google Analytics