本教程将指导您如何使用Java Websocket客户端连接实时行情接口,并订阅相关数据。
步骤1:配置您的项目
确保您的项目已引入以下依赖:
jakarta.websocket-api
jakarta.websocket-client-api
fastjson2
lombok
spring-context
(如果使用Spring框架)
步骤2:创建Websocket客户端
创建一个Java类,例如 WebsocketExample
,并添加 @ClientEndpoint
和 @Component
注解。
package org.example.ws;import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;// 注册获取API KEY: www.infoway.io
// 官方对接文档:docs.infoway.io@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {// 本地session通道private static Session session;// wss连接地址 business可以为stock、crypto、common;apikey为您的凭证// 申请API KEY: www.infoway.ioprivate static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";@PostConstructpublic void connectAll() {try {// 建立WEBSOCKET连接connect(WS_URL);// 开启自动重连startReconnection(WS_URL);} catch (Exception e) {log.error("Failed to connect to " + WS_URL + ": " + e.getMessage());}}// 自动重连机制,会开启一个定时线程判断连接是否断开,断开自动重连private void startReconnection(String s) {ScheduledExecutorService usExecutor = Executors.newScheduledThreadPool(1);Runnable usTask = () -> {if (session == null || !session.isOpen()) {log.debug("reconnection...");connect(s);}};usExecutor.scheduleAtFixedRate(usTask, 1000, 10000, TimeUnit.MILLISECONDS);}// 建立WEBSOCKET连接具体实现private void connect(String s) {try {WebSocketContainer container = ContainerProvider.getWebSocketContainer();session = container.connectToServer(WebsocketExample.class, URI.create(s));} catch (DeploymentException | IOException e) {log.error("Failed to connect to the server: {}", e.getMessage());}}// WEBSOCKET连接建立成功后会执行下面的方法@OnOpenpublic void onOpen(Session session) throws IOException, InterruptedException {System.out.println("Connection opened: " + session.getId());// 订阅实时成交明细 (代码 10000)subscribeToData(session, 10000, "BTCUSDT", "trade_trace");// 订阅实时盘口数据 (代码 10003)Thread.sleep(5000); // 间隔一段时间subscribeToData(session, 10003, "BTCUSDT", "depth_trace");// 订阅实时K线数据 (代码 10006, 1分钟K线)Thread.sleep(5000); // 间隔一段时间subscribeKlineData(session, "BTCUSDT", 1, "kline_trace");// 定时发送心跳 (每30秒)ScheduledExecutorService pingExecutor = Executors.newScheduledThreadPool(1);Runnable pingTask = WebsocketExample::ping;pingExecutor.scheduleAtFixedRate(pingTask, 30, 30, TimeUnit.SECONDS);}// 封装订阅数据请求private void subscribeToData(Session session, int code, String symbol, String trace) throws IOException {JSONObject sendObj = new JSONObject();sendObj.put("code", code);sendObj.put("trace", trace);JSONObject data = new JSONObject();data.put("codes", symbol);sendObj.put("data", data);session.getBasicRemote().sendText(sendObj.toJSONString());}// 封装订阅K线数据请求private void subscribeKlineData(Session session, String symbol, int klineType, String trace) throws IOException {JSONObject klineSendObj = new JSONObject();klineSendObj.put("code", 10006);klineSendObj.put("trace", trace);JSONObject klineData = new JSONObject();JSONArray klineDataArray = new JSONArray();JSONObject klineObj = new JSONObject();klineObj.put("type", klineType);klineObj.put("codes", symbol);klineDataArray.add(klineObj);klineData.put("arr", klineDataArray);klineSendObj.put("data", klineData);session.getBasicRemote().sendText(klineSendObj.toJSONString());}@OnMessagepublic void onMessage(String message, Session session) {// 处理接收到的消息,包含订阅成功/失败提示和行情数据System.out.println("Message received: " + message);}@OnClosepublic void onClose(Session session, CloseReason reason) {System.out.println("Connection closed: " + session.getId() + ", reason: " + reason);}@OnErrorpublic void onError(Throwable error) {error.printStackTrace();}// 发送心跳请求public static void ping() {try {JSONObject jsonObject = new JSONObject();jsonObject.put("code", 10010);jsonObject.put("trace", "heartbeat_trace");session.getBasicRemote().sendText(jsonObject.toJSONString());} catch (IOException e) {throw new RuntimeException("Failed to send heartbeat: " + e.getMessage(), e);}}
}
步骤3:理解核心方法
WS_URL
: 这是WebSocket连接地址,您需要替换 yourApikey
为您的实际凭证。
business
参数指定业务类型:stock
(股票), crypto
(加密货币), common
(通用)。
@PostConstruct connectAll()
: Spring Boot应用启动时会自动调用此方法,用于建立WebSocket连接和启动自动重连机制。
startReconnection(String s)
: 实现自动重连的逻辑,每隔10秒检查连接状态,如果断开则尝试重新连接。
connect(String s)
: 建立WebSocket连接的具体实现。
@OnOpen onOpen(Session session)
: 连接成功建立后,此方法会被调用。您可以在这里发送订阅请求。
订阅请求: 通过构建JSON对象发送订阅请求。code
字段是请求协议号,data
字段包含订阅的具体内容(例如:交易对、K线类型)。
实时成交明细: code
为 10000
。
实时盘口数据: code
为 10003
。
实时K线数据: code
为 10006
。type
字段指定K线类型(例如:1代表1分钟K线)。
心跳机制: ping()
方法会定时发送心跳请求 (code: 10010
),防止长时间不活跃导致连接断开。
@OnMessage onMessage(String message, Session session)
: 当接收到服务端推送的消息时,此方法会被调用。您可以在这里解析并处理行情数据。
@OnClose onClose(Session session, CloseReason reason)
: 连接关闭时调用。
@OnError onError(Throwable error)
: 连接发生错误时调用。
步骤4:运行您的应用
如果您使用的是Spring Boot,直接运行您的主应用类即可。WebSocket客户端会自动启动并尝试连接。
注意事项:
- API文档: 详细的
code
列表和data
格式请参考Infoway的WebSocket API文档:https://docs.infoway.io/websocket-api/endpoints - 错误处理: 生产环境中,您应该对连接、发送和接收消息中的异常进行更详细的捕获和处理。
- 自定义Trace:
trace
字段是自定义标识,可用于追踪请求响应。
通过以上步骤,您就能成功连接实时行情接口并开始接收数据了。