springboot - 集成websocket - rabbitmq 下载本文

import javax.websocket.server.ServerEndpoint;

//该注解用来指定一个URI,客户端可以通过这个URI来连接到WebSocket。类似Servlet的注解mapping。无需在web.xml中配置。

import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer;

@ServerEndpoint(value = \@Component

public class MyWebSocket { //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0;

//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet();

//与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session;

public static String username = null;

private static final String EXCHANGE_NAME = \ /**

* 连接建立成功调用的方法*/ @OnOpen

public void onOpen(@PathParam(\ this.session = session;

webSocketSet.add(this); //加入set中 addOnlineCount(); //在线数加1

System.out.println(\有新连接加入!当前在线人数为\ try {

ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\

Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明direct类型转发器

channel.exchangeDeclare(EXCHANGE_NAME, \

String queueName = channel.queueDeclare().getQueue();

// 指定binding_key

channel.queueBind(queueName, EXCHANGE_NAME, username); channel.queueBind(queueName, EXCHANGE_NAME, \

System.out.println(\

QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody());

System.out.println(MyWebSocket.webSocketSet.size()+\ for(MyWebSocket item: MyWebSocket.webSocketSet){ if(item.equals(this)){ try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } }

} }

} catch (Exception e) {

System.out.println(\异常\ } }

/**

* 连接关闭调用的方法 */

@OnClose

public void onClose() {

webSocketSet.remove(this); //从set中删除 subOnlineCount(); //在线数减1

System.out.println(\有一连接关闭!当前在线人数为\ }

/**

* 收到客户端消息后调用的方法 *

* @param message 客户端发送过来的消息*/ @OnMessage

public void onMessage(String message, Session session) {

System.out.println(\来自客户端的消息:\

//群发消息

for (MyWebSocket item : webSocketSet) { try {

item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } }

/**

* 发生错误时调用 */

@OnError

public void onError(Session session, Throwable error) { System.out.println(\发生错误\ error.printStackTrace(); }

public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); }

/**

* 群发自定义消息 * */

public static void sendInfo(String message) throws IOException { for (MyWebSocket item : webSocketSet) { try {

item.sendMessage(message); } catch (IOException e) { continue; } } }

public static synchronized int getOnlineCount() { return onlineCount; }

public static synchronized void addOnlineCount() { MyWebSocket.onlineCount++; }

public static synchronized void subOnlineCount() { MyWebSocket.onlineCount--; } }

8.编写GetHttpSessionConfigurator

package com.websocket;

import javax.servlet.http.HttpSession;

import javax.websocket.HandshakeResponse;

import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpoint;

import javax.websocket.server.ServerEndpointConfig;

import javax.websocket.server.ServerEndpointConfig.Configurator;

public class GetHttpSessionConfigurator extends ServerEndpointConfig.Configurator {

@Override

public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {

HttpSession httpSession = (HttpSession)request.getHttpSession();

config.getUserProperties().put(HttpSession.class.getName(),httpSession); } } 10.编写WebSocketConfig

package com;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration

public class WebSocketConfig { @Bean

public ServerEndpointExporter serverEndpointExporter (){ return new ServerEndpointExporter(); } }

11.当然, Erlang和RabbitMQ服务要提前安装好。

在运行之前我们要启动该服务,

12.双机即可

13.之后就可以启动项目了。生产,并且消费了。