diff --git a/pom.xml b/pom.xml
index 870d991..13a21d0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,11 @@
+
+ org.springframework.boot
+ spring-boot-starter-data-mongodb
+
+
com.alibaba
druid-spring-boot-starter
@@ -185,6 +190,13 @@
30.1-jre
+
+ com.thomsonreuters.ema
+ ema
+ 3.5.1.0
+
+
+
diff --git a/src/main/java/cn/stock/market/dto/RetifiveStockInfo.java b/src/main/java/cn/stock/market/dto/RetifiveStockInfo.java
new file mode 100644
index 0000000..8206dff
--- /dev/null
+++ b/src/main/java/cn/stock/market/dto/RetifiveStockInfo.java
@@ -0,0 +1,32 @@
+package cn.stock.market.dto;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * @author gs
+ * @date 2024/4/9 10:53
+ */
+@Data
+@Builder
+/**
+ * RetifiveStockInfo类用于存储股票信息。
+ * 该类包含了股票的基本信息和交易数据。
+ */
+public class RetifiveStockInfo {
+ private String stockCode; // 股票代码
+ private String symbol; // 交易代码
+ private String stockName; // 股票名称
+ private String stockType; // 股票类型
+ private String currentPrice; // 当前价格
+ private String changePercent; // 变动百分比
+ private String openPrice; // 开盘价格
+ private String previousPrice; // 昨日收盘价格
+ private String volume; // 成交量
+ private String highPrice; // 最高价格
+ private String lowPrice; // 最低价格
+ private String week52HighPrice; // 52周最高价格
+ private String week52LowPrice; // 52周最低价格
+ private String status;
+}
+
diff --git a/src/main/java/cn/stock/market/listener/AppClient.java b/src/main/java/cn/stock/market/listener/AppClient.java
new file mode 100644
index 0000000..c6f3fb2
--- /dev/null
+++ b/src/main/java/cn/stock/market/listener/AppClient.java
@@ -0,0 +1,84 @@
+package cn.stock.market.listener;
+
+import com.thomsonreuters.ema.access.AckMsg;
+import com.thomsonreuters.ema.access.EmaFactory;
+import com.thomsonreuters.ema.access.GenericMsg;
+import com.thomsonreuters.ema.access.Msg;
+import com.thomsonreuters.ema.access.OmmConsumer;
+import com.thomsonreuters.ema.access.OmmConsumerClient;
+import com.thomsonreuters.ema.access.OmmConsumerConfig;
+import com.thomsonreuters.ema.access.OmmConsumerEvent;
+import com.thomsonreuters.ema.access.RefreshMsg;
+import com.thomsonreuters.ema.access.ReqMsg;
+import com.thomsonreuters.ema.access.StatusMsg;
+import com.thomsonreuters.ema.access.UpdateMsg;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+@Slf4j
+public class AppClient implements OmmConsumerClient {
+
+ private CompletableFuture messageFuture = new CompletableFuture<>();
+ private OmmConsumer consumer;
+
+
+
+ @Autowired
+ public AppClient(OmmConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ // 根据itemName进行订阅的方法
+ public void subscribe(String itemName) {
+ resetMessageFuture(); // 重置Future以便新的请求
+ ReqMsg reqMsg = EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name(itemName);
+ consumer.registerClient(reqMsg, this);
+ }
+
+
+ public CompletableFuture getMessageFuture() {
+ return messageFuture;
+ }
+
+ private void resetMessageFuture() {
+ this.messageFuture = new CompletableFuture<>();
+ }
+
+ @Override
+ public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
+ log.error("监听的消息:"+refreshMsg.toString());
+ messageFuture.complete(refreshMsg);
+ }
+
+ @Override
+ public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {
+
+ }
+
+ @Override
+ public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {
+
+ }
+
+ @Override
+ public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {
+
+ }
+
+ @Override
+ public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {
+
+ }
+
+ @Override
+ public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {
+
+ }
+ // 其他消息处理...
+}
diff --git a/src/main/java/cn/stock/market/listener/ConcurrentAppClient.java b/src/main/java/cn/stock/market/listener/ConcurrentAppClient.java
new file mode 100644
index 0000000..108b4a5
--- /dev/null
+++ b/src/main/java/cn/stock/market/listener/ConcurrentAppClient.java
@@ -0,0 +1,75 @@
+package cn.stock.market.listener;
+
+import com.thomsonreuters.ema.access.AckMsg;
+import com.thomsonreuters.ema.access.EmaFactory;
+import com.thomsonreuters.ema.access.GenericMsg;
+import com.thomsonreuters.ema.access.Msg;
+import com.thomsonreuters.ema.access.OmmConsumer;
+import com.thomsonreuters.ema.access.OmmConsumerClient;
+import com.thomsonreuters.ema.access.OmmConsumerEvent;
+import com.thomsonreuters.ema.access.RefreshMsg;
+import com.thomsonreuters.ema.access.ReqMsg;
+import com.thomsonreuters.ema.access.StatusMsg;
+import com.thomsonreuters.ema.access.UpdateMsg;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author gs
+ * @date 2024/4/8 22:01
+ */
+@Service
+public class ConcurrentAppClient implements OmmConsumerClient {
+ private final ConcurrentHashMap> futures = new ConcurrentHashMap<>();
+ private OmmConsumer consumer;
+
+ @Autowired
+ public ConcurrentAppClient(OmmConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ public CompletableFuture subscribe(String itemName) {
+ CompletableFuture future = new CompletableFuture<>();
+ futures.put(itemName, future);
+ ReqMsg reqMsg = EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name(itemName);
+ consumer.registerClient(reqMsg, this);
+ return future;
+ }
+
+ @Override
+ public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
+ String itemName = refreshMsg.name();
+ CompletableFuture future = futures.remove(itemName);
+ if (future != null) {
+ future.complete(refreshMsg);
+ }
+ }
+
+ @Override
+ public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {
+
+ }
+
+ @Override
+ public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {
+
+ }
+
+ @Override
+ public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {
+
+ }
+
+ @Override
+ public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {
+
+ }
+
+ @Override
+ public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {
+
+ }
+}
diff --git a/src/main/java/cn/stock/market/listener/EmaConfig.xml b/src/main/java/cn/stock/market/listener/EmaConfig.xml
new file mode 100644
index 0000000..13c878f
--- /dev/null
+++ b/src/main/java/cn/stock/market/listener/EmaConfig.xml
@@ -0,0 +1,898 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/main/java/cn/stock/market/listener/MongoConfig.java b/src/main/java/cn/stock/market/listener/MongoConfig.java
new file mode 100644
index 0000000..ea57c0f
--- /dev/null
+++ b/src/main/java/cn/stock/market/listener/MongoConfig.java
@@ -0,0 +1,25 @@
+package cn.stock.market.listener;
+
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoClient;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.mongodb.core.MongoTemplate;
+
+@Configuration
+public class MongoConfig {
+
+ @Bean
+ public MongoClient mongoClient() {
+ // 连接到MongoDB服务器(示例中使用默认的MongoDB端口)
+ // 请根据你的实际情况修改这里的连接字符串
+ return MongoClients.create("mongodb://root:123456@localhost:27017");
+ }
+
+ @Bean
+ public MongoTemplate mongoTemplate() {
+ // 创建并返回MongoTemplate的实例,需要提供MongoClient实例和数据库名
+ // 替换"yourDatabase"为你的数据库名
+ return new MongoTemplate(mongoClient(), "company");
+ }
+}
diff --git a/src/main/java/cn/stock/market/listener/MyAppRunner.java b/src/main/java/cn/stock/market/listener/MyAppRunner.java
new file mode 100644
index 0000000..3d0d71f
--- /dev/null
+++ b/src/main/java/cn/stock/market/listener/MyAppRunner.java
@@ -0,0 +1,36 @@
+package cn.stock.market.listener;
+
+import com.thomsonreuters.ema.access.EmaFactory;
+import com.thomsonreuters.ema.access.OmmConsumer;
+import com.thomsonreuters.ema.access.OmmConsumerConfig;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MyAppRunner implements ApplicationRunner {
+ static String userName = "GE-A-10288435-3-15856";
+ static String password = "8EyCWDQ%XITcGRM@RhKokxX05FZJe1";
+ static String clientId = "662b37b071144c9f8f2507d93037a019a010ae0e";
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ // 在这里写入您的初始化代码
+ System.out.println("应用已启动,执行初始化代码...");
+
+ // The "Consumer_4" uses EncryptedProtocolType::RSSL_SOCKET while the "Consumer_5" uses EncryptedProtocolType::RSSL_WEBSOCKET predefined in EmaConfig.xml
+ }
+
+ @Bean
+ public OmmConsumer getOmmConsumer() {
+ OmmConsumer consumer = null;
+ OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();
+ config.username(userName);
+ config.password(password);
+ config.clientId(clientId);
+ consumer = EmaFactory.createOmmConsumer(config.consumerName( "Consumer_4"));
+ return consumer;
+ }
+
+
+}
diff --git a/src/main/java/cn/stock/market/listener/StockInfoRefinitiv.java b/src/main/java/cn/stock/market/listener/StockInfoRefinitiv.java
new file mode 100644
index 0000000..dbc5254
--- /dev/null
+++ b/src/main/java/cn/stock/market/listener/StockInfoRefinitiv.java
@@ -0,0 +1,22 @@
+package cn.stock.market.listener;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+/**
+ * @author gs
+ * @date 2024/4/8 21:00
+ */
+@Document("stock_info")
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class StockInfoRefinitiv {
+ private String stockName;
+ private String stockCode;
+ private String stockType;
+ private String status;
+ private String commandStr;
+}
diff --git a/src/main/java/cn/stock/market/listener/SymbolRefinitiv.java b/src/main/java/cn/stock/market/listener/SymbolRefinitiv.java
new file mode 100644
index 0000000..a51228a
--- /dev/null
+++ b/src/main/java/cn/stock/market/listener/SymbolRefinitiv.java
@@ -0,0 +1,19 @@
+package cn.stock.market.listener;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+/**
+ * @author gs
+ * @date 2024/4/8 17:20
+ */
+@Data
+@AllArgsConstructor
+@Document("symbol")
+public class SymbolRefinitiv {
+ private String symbol;
+ private String stockType;
+ private String commandStr;
+}
diff --git a/src/main/java/cn/stock/market/web/MessageRetifiveController.java b/src/main/java/cn/stock/market/web/MessageRetifiveController.java
new file mode 100644
index 0000000..da4a91d
--- /dev/null
+++ b/src/main/java/cn/stock/market/web/MessageRetifiveController.java
@@ -0,0 +1,488 @@
+package cn.stock.market.web;
+
+import cn.stock.market.dto.RetifiveStockInfo;
+import cn.stock.market.listener.AppClient;
+import cn.stock.market.listener.ConcurrentAppClient;
+import cn.stock.market.listener.StockInfoRefinitiv;
+import cn.stock.market.listener.SymbolRefinitiv;
+import cn.stock.market.utils.ServerResponse;
+import com.google.common.collect.Lists;
+import com.thomsonreuters.ema.access.DataType;
+import com.thomsonreuters.ema.access.FieldEntry;
+import com.thomsonreuters.ema.access.FieldList;
+import com.thomsonreuters.ema.access.Map;
+import com.thomsonreuters.ema.access.MapEntry;
+import com.thomsonreuters.ema.access.RefreshMsg;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@RestController
+@Slf4j
+@RequestMapping("/retifive/**")
+public class MessageRetifiveController {
+
+ @Autowired
+ private AppClient appClient;
+
+ @Autowired
+ private ConcurrentAppClient concurrentAppClient;
+
+ @Autowired
+ private MongoTemplate mongoTemplate;
+
+ @GetMapping("/message")
+ public ServerResponse> getMessage(@RequestParam String itemName) {
+ RefreshMsg refreshMsg = null;
+ try {
+ appClient.subscribe(itemName); // 根据itemName订阅
+ // 等待消息
+ refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒
+ return ServerResponse.createBySuccess("操作成功",refreshMsg.toString());
+ } catch (Exception e) {
+ return ServerResponse.createByError();
+ }
+ }
+
+ @GetMapping("/getStockSymbolList")
+ public ServerResponse> getStockList() {
+ RefreshMsg refreshMsg = null;
+ List itemNameList = generateItemNames();
+ String nextLink = "";
+ for (String itemName : itemNameList) {
+ try {
+ appClient.subscribe(itemName); // 根据itemName订阅
+ // 等待消息
+ refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒
+ System.out.println(refreshMsg.name());
+ if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
+ nextLink = decode1(refreshMsg.payload().fieldList(),refreshMsg.name());
+ while (StringUtils.isNotBlank(nextLink)&&!nextLink.contains("blank data")){
+ appClient.subscribe(nextLink); // 根据itemName订阅
+ // 等待消息
+ refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒
+ nextLink = decode1(refreshMsg.payload().fieldList(),refreshMsg.name());
+ }
+ }
+ } catch (Exception e) {
+ log.error("获取列表异常link:"+nextLink,e);
+ return ServerResponse.createByError();
+ }
+ }
+ return ServerResponse.createBySuccess("操作成功");
+ }
+
+
+ /* @GetMapping("/getStockInfoList")
+ @Deprecated
+ public ServerResponse> getStockInfoList() {
+ ExecutorService executorService = Executors.newFixedThreadPool(5); // 创建一个固定大小的线程池
+ List collect = mongoTemplate.query(SymbolRefinitiv.class).stream().collect(Collectors.toList());
+ List collect2 = mongoTemplate.query(StockInfoRefinitiv.class).stream().collect(Collectors.toList());
+
+ List difference = collect.stream()
+ .filter(symbol -> collect2.stream()
+ .noneMatch(stockInfo -> stockInfo.getCommandStr().equals(symbol.getSymbol())))
+ .collect(Collectors.toList());
+
+ List itemNameList = difference.stream().map(SymbolRefinitiv::getSymbol).collect(Collectors.toList());
+
+ // 计算每个线程应处理的元素数量
+ int size = itemNameList.size();
+ int chunkSize = size / 5; // 假设列表可以均匀分配
+ for (int i = 0; i < 5; i++) {
+ int start = i * chunkSize;
+ int end = (i == 4) ? size : (start + chunkSize); // 最后一个线程可能需要处理更多的元素
+ List sublist = itemNameList.subList(start, end);
+ // 将子列表的处理提交给线程池
+ executorService.submit(() -> {
+ for (String itemName : sublist) {
+ try {
+ CompletableFuture future = concurrentAppClient.subscribe(itemName);
+ RefreshMsg refreshMsg = future.get(10, TimeUnit.SECONDS);
+ if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
+ // List safeList = Collections.synchronizedList(new ArrayList<>(refreshMsg.payload().fieldList()));
+ decode2(refreshMsg.payload().fieldList(),refreshMsg.name());
+ }
+ } catch (Exception e) {
+ log.error("获取列表异常link:"+itemName,e);
+ }
+ }
+ });
+ }
+
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ executorService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+
+ return ServerResponse.createBySuccess("操作成功");
+ }*/
+
+
+ @GetMapping("/getStockInfoList2")
+ public ServerResponse> getStockInfoList2() {
+ List collect = mongoTemplate.query(SymbolRefinitiv.class).stream().map(SymbolRefinitiv::getSymbol).collect(Collectors.toList());
+
+ // 计算每个线程应处理的元素数量
+ for (String itemName : collect) {
+ try {
+ appClient.subscribe(itemName); // 根据itemName订阅
+ // 等待消息
+ RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒
+ if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
+ // List safeList = Collections.synchronizedList(new ArrayList<>(refreshMsg.payload().fieldList()));
+ decode2(refreshMsg.payload().fieldList(),refreshMsg.name());
+ }
+ } catch (Exception e) {
+ log.error("获取列表异常link:"+itemName,e);
+ }
+ }
+
+ return ServerResponse.createBySuccess("操作成功");
+ }
+
+ @GetMapping("/getStockDetail")
+ public ServerResponse> getStockInfoList2(String itemName) {
+
+ // 计算每个线程应处理的元素数量
+ try {
+ appClient.subscribe(itemName); // 根据itemName订阅
+ // 等待消息
+ RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒
+ if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
+ RetifiveStockInfo retifiveStockInfo = decode3(refreshMsg.payload().fieldList(), refreshMsg.name());
+ return ServerResponse.createBySuccess("操作成功",retifiveStockInfo);
+ }
+ } catch (Exception e) {
+ log.error("获取股票详情link:"+itemName,e);
+ }
+
+ return ServerResponse.createBySuccess("操作成功");
+ }
+
+
+ @GetMapping("/getStockDetailList")
+ public ServerResponse> getStockDetailList(String itemName) {
+
+ // 计算每个线程应处理的元素数量
+ String[] itemNames = itemName.split(",");
+ List list = Lists.newArrayList();
+ for (String name : itemNames) {
+ try {
+ appClient.subscribe(name); // 根据itemName订阅
+ // 等待消息
+ RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒
+ if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
+ RetifiveStockInfo retifiveStockInfo = decode3(refreshMsg.payload().fieldList(), refreshMsg.name());
+ list.add(retifiveStockInfo);
+ }
+ } catch (Exception e) {
+ log.error("获取股票详情link:"+name,e);
+ }
+ }
+ return ServerResponse.createBySuccess("操作成功",list);
+
+ }
+
+
+
+ /* void decode(FieldList fieldList)
+ {
+ for (FieldEntry fieldEntry : fieldList)
+ {
+ System.out.print("Fid: " + fieldEntry.fieldId() + " Name = " + fieldEntry.name() + " DataType: " + DataType.asString(fieldEntry.load().dataType()) + " Value: ");
+
+ if (Data.DataCode.BLANK == fieldEntry.code())
+ System.out.println(" blank");
+ else
+ switch (fieldEntry.loadType())
+ {
+ case DataType.DataTypes.REAL :
+ System.out.println(fieldEntry.real().asDouble());
+ break;
+ case DataType.DataTypes.DATE :
+ System.out.println(fieldEntry.date().day() + " / " + fieldEntry.date().month() + " / " + fieldEntry.date().year());
+ break;
+ case DataType.DataTypes.TIME :
+ System.out.println(fieldEntry.time().hour() + " / " + fieldEntry.time().minute() + " / " + fieldEntry.time().second() + fieldEntry.time().millisecond());
+ break;
+ case DataType.DataTypes.INT :
+ System.out.println(fieldEntry.intValue());
+ break;
+ case DataType.DataTypes.UINT :
+ System.out.println(fieldEntry.uintValue());
+ break;
+ case DataType.DataTypes.ASCII :
+ System.out.println(fieldEntry.ascii());
+ break;
+ case DataType.DataTypes.ENUM :
+ System.out.println(fieldEntry.hasEnumDisplay() ? fieldEntry.enumDisplay() : fieldEntry.enumValue());
+ break;
+ case DataType.DataTypes.RMTES :
+ System.out.println(fieldEntry.rmtes());
+ break;
+ case DataType.DataTypes.ERROR :
+ System.out.println("(" + fieldEntry.error().errorCodeAsString() + ")");
+ break;
+ default :
+ System.out.println();
+ break;
+ }
+ }
+ }*/
+
+ String decode1(FieldList fieldList,String name)
+ {
+ Iterator iter = fieldList.iterator();
+ FieldEntry fieldEntry;
+ String nextLink = "";
+ while (iter.hasNext())
+ {
+ fieldEntry = iter.next();
+ System.out.println("Fid: " + fieldEntry.fieldId() + " Name: " + fieldEntry.name() + " value: " + fieldEntry.load());
+ if(fieldEntry.name().startsWith("LINK_")){
+ if(fieldEntry.loadType() == DataType.DataTypes.ASCII){
+ String symbol = fieldEntry.load().toString();
+ if(!symbol.contains("blank data")){
+ mongoTemplate.insert(new SymbolRefinitiv(symbol,"bse",name));
+ }
+ }
+ }
+ if(fieldEntry.name().startsWith("NEXT_LR")){
+ nextLink = fieldEntry.load().toString();
+ if(nextLink.contains("blank data")){
+ break;
+ }
+ }
+
+ if(fieldEntry.name().startsWith("NEXT_LR")){
+ nextLink = fieldEntry.load().toString();
+ if(nextLink.contains("blank data")){
+ break;
+ }
+ }
+ String stockName = "";
+ if(fieldEntry.name().equals("DSPLY_NAME")){
+ if(fieldEntry.loadType() == DataType.DataTypes.RMTES) {
+ stockName = fieldEntry.load().toString();
+ }
+ }
+ String stockCode = "";
+ if(fieldEntry.name().equals("PROV_SYMB")){
+ if(fieldEntry.loadType() == DataType.DataTypes.RMTES) {
+ stockCode = fieldEntry.load().toString();
+ }
+ }
+
+ String status = "";
+ if(fieldEntry.name().equals("TRD_STATUS")){
+ if(fieldEntry.loadType() == DataType.DataTypes.RMTES) {
+ status = fieldEntry.load().toString();
+ }
+ }
+
+ StockInfoRefinitiv stockInfoRefinitiv = new StockInfoRefinitiv();
+ stockInfoRefinitiv.setStockName(stockName);
+ stockInfoRefinitiv.setStockCode(stockCode);
+ stockInfoRefinitiv.setStatus(status);
+ stockInfoRefinitiv.setStockType("bse");
+ stockInfoRefinitiv.setCommandStr(name);
+ }
+ return nextLink;
+ }
+
+ synchronized void decode2(FieldList fieldList,String name)
+ {
+ List safeList = new CopyOnWriteArrayList<>();
+ for(FieldEntry fieldEntry : fieldList) {
+ safeList.add(fieldEntry); // 假设这里是安全的,但实际上你可能需要根据FieldEntry的实现来确定
+ }
+ String stockName = "";
+ String stockCode = "";
+ String status = "";
+
+ for(FieldEntry fieldEntry : safeList)
+ {
+ System.out.println("Fid: " + fieldEntry.fieldId() + " Name: " + fieldEntry.name() + " value: " + fieldEntry.load());
+ if(fieldEntry.name().startsWith("LINK_")){
+ if(fieldEntry.loadType() == DataType.DataTypes.ASCII){
+ String symbol = fieldEntry.load().toString();
+ if(!symbol.contains("blank data")){
+ mongoTemplate.insert(new SymbolRefinitiv(symbol,"bse",name));
+ }
+ }
+ }
+
+
+ if(fieldEntry.name().equals("DSPLY_NAME")){
+ if(fieldEntry.loadType() == DataType.DataTypes.RMTES) {
+ stockName = fieldEntry.load().toString();
+ }
+ }
+ if(fieldEntry.name().equals("PROV_SYMB")){
+ if(fieldEntry.loadType() == DataType.DataTypes.RMTES) {
+ stockCode = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("TRD_STATUS")){
+ if(fieldEntry.loadType() == DataType.DataTypes.RMTES) {
+ status = fieldEntry.load().toString();
+ }
+ }
+ }
+
+ StockInfoRefinitiv stockInfoRefinitiv = new StockInfoRefinitiv();
+ stockInfoRefinitiv.setStockName(stockName);
+ stockInfoRefinitiv.setStockCode(stockCode);
+ stockInfoRefinitiv.setStatus(status);
+ stockInfoRefinitiv.setStockType("bse");
+ stockInfoRefinitiv.setCommandStr(name);
+ mongoTemplate.insert(stockInfoRefinitiv);
+ }
+
+
+
+ RetifiveStockInfo decode3(FieldList fieldList, String name)
+ {
+ Iterator iter = fieldList.iterator();
+ FieldEntry fieldEntry;
+ String stockName = "";
+ String stockCode = "";
+ String status = "";
+ String price = "";
+ String openPrice = "";
+ String previousPrice = "";
+ String percentChange = "";
+ String week52High ="";
+ String week52Low = "";
+ String high = "";
+ String low = "";
+ String volume = "";//实时交易数量
+ String stockType = "";
+ while (iter.hasNext())
+ {
+ fieldEntry = iter.next();
+ System.out.println("Fid: " + fieldEntry.fieldId() + " Name: " + fieldEntry.name() +" Unit: "+DataType.asString(fieldEntry.loadType()) + " value: " + fieldEntry.load());
+ if(fieldEntry.name().equals("DSPLY_NAME")){
+ if(fieldEntry.loadType() == DataType.DataTypes.RMTES) {
+ stockName = fieldEntry.load().toString();
+ }
+ }
+ if(fieldEntry.name().equals("PROV_SYMB")){
+ if(fieldEntry.loadType() == DataType.DataTypes.RMTES) {
+ stockCode = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("INST_PHASE")){
+ if(fieldEntry.loadType() == DataType.DataTypes.ENUM) {
+ status = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("OPEN_PRC")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ openPrice = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("HST_CLOSE")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ previousPrice = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("52WK_HIGH")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ week52High = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("52WK_LOW")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ week52Low = fieldEntry.load().toString();
+ }
+ }
+ if(fieldEntry.name().equals("NETCHNG_1")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ percentChange = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("HIGH_1")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ high = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("LOW_1")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ low = fieldEntry.load().toString();
+ }
+ }
+ if(fieldEntry.name().equals("IRGVOL")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ volume = fieldEntry.load().toString();
+ }
+ }
+
+ if(fieldEntry.name().equals("TRDPRC_1")){
+ if(fieldEntry.loadType() == DataType.DataTypes.REAL) {
+ price = fieldEntry.load().toString();
+ }
+ }
+ if(fieldEntry.name().equals("RDN_EXCHID")){
+ if(fieldEntry.loadType() == DataType.DataTypes.ENUM) {
+ stockType = fieldEntry.load().toString();
+ if(StringUtils.equals(stockType,"145")){
+ stockType = "bse";//孟买国家交易所
+ }else if(StringUtils.equals(stockType,"147")){
+ stockType = "nse";//印度国家交易所
+ }
+ }
+ }
+ }
+ RetifiveStockInfo retifiveStockInfo = RetifiveStockInfo.builder().stockCode(stockCode).stockName(stockName).symbol(name).status(status)
+ .openPrice(openPrice).currentPrice(price).highPrice(high).lowPrice(low).previousPrice(previousPrice).changePercent(percentChange)
+ .volume(volume).week52HighPrice(week52High).week52LowPrice(week52Low).stockType(stockType)
+ .build();
+
+ return retifiveStockInfo;
+ }
+
+
+ private static List generateItemNames() {
+ List itemNames = new ArrayList<>();
+
+ // 从字母 'A' 到 'Z'
+ for (char letter = 'A'; letter <= 'Z'; letter++) {
+ String itemName = "0#" + letter + ".BO";
+ itemNames.add(itemName);
+ }
+
+ return itemNames;
+ }
+}
diff --git a/src/main/resources/EmaConfig.xml b/src/main/resources/EmaConfig.xml
new file mode 100644
index 0000000..13c878f
--- /dev/null
+++ b/src/main/resources/EmaConfig.xml
@@ -0,0 +1,898 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+