From 9d3bac8a9d5073f613c83f530097fb8106ccae0a Mon Sep 17 00:00:00 2001 From: Achilles Date: Wed, 10 Apr 2024 13:56:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81Retifive?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 12 + .../stock/market/dto/RetifiveStockInfo.java | 32 + .../cn/stock/market/listener/AppClient.java | 84 ++ .../market/listener/ConcurrentAppClient.java | 75 ++ .../cn/stock/market/listener/EmaConfig.xml | 898 ++++++++++++++++++ .../cn/stock/market/listener/MongoConfig.java | 25 + .../cn/stock/market/listener/MyAppRunner.java | 36 + .../market/listener/StockInfoRefinitiv.java | 22 + .../market/listener/SymbolRefinitiv.java | 19 + .../market/web/MessageRetifiveController.java | 488 ++++++++++ src/main/resources/EmaConfig.xml | 898 ++++++++++++++++++ 11 files changed, 2589 insertions(+) create mode 100644 src/main/java/cn/stock/market/dto/RetifiveStockInfo.java create mode 100644 src/main/java/cn/stock/market/listener/AppClient.java create mode 100644 src/main/java/cn/stock/market/listener/ConcurrentAppClient.java create mode 100644 src/main/java/cn/stock/market/listener/EmaConfig.xml create mode 100644 src/main/java/cn/stock/market/listener/MongoConfig.java create mode 100644 src/main/java/cn/stock/market/listener/MyAppRunner.java create mode 100644 src/main/java/cn/stock/market/listener/StockInfoRefinitiv.java create mode 100644 src/main/java/cn/stock/market/listener/SymbolRefinitiv.java create mode 100644 src/main/java/cn/stock/market/web/MessageRetifiveController.java create mode 100644 src/main/resources/EmaConfig.xml diff --git a/pom.xml b/pom.xml index 870d991..13a21d0 100644 --- a/pom.xml +++ b/pom.xml @@ -42,6 +42,11 @@ + + org.springframework.boot + spring-boot-starter-data-mongodb + + com.alibaba druid-spring-boot-starter @@ -185,6 +190,13 @@ 30.1-jre + + com.thomsonreuters.ema + ema + 3.5.1.0 + + + diff --git a/src/main/java/cn/stock/market/dto/RetifiveStockInfo.java b/src/main/java/cn/stock/market/dto/RetifiveStockInfo.java new file mode 100644 index 0000000..8206dff --- /dev/null +++ b/src/main/java/cn/stock/market/dto/RetifiveStockInfo.java @@ -0,0 +1,32 @@ +package cn.stock.market.dto; + +import lombok.Builder; +import lombok.Data; + +/** + * @author gs + * @date 2024/4/9 10:53 + */ +@Data +@Builder +/** + * RetifiveStockInfo类用于存储股票信息。 + * 该类包含了股票的基本信息和交易数据。 + */ +public class RetifiveStockInfo { + private String stockCode; // 股票代码 + private String symbol; // 交易代码 + private String stockName; // 股票名称 + private String stockType; // 股票类型 + private String currentPrice; // 当前价格 + private String changePercent; // 变动百分比 + private String openPrice; // 开盘价格 + private String previousPrice; // 昨日收盘价格 + private String volume; // 成交量 + private String highPrice; // 最高价格 + private String lowPrice; // 最低价格 + private String week52HighPrice; // 52周最高价格 + private String week52LowPrice; // 52周最低价格 + private String status; +} + diff --git a/src/main/java/cn/stock/market/listener/AppClient.java b/src/main/java/cn/stock/market/listener/AppClient.java new file mode 100644 index 0000000..c6f3fb2 --- /dev/null +++ b/src/main/java/cn/stock/market/listener/AppClient.java @@ -0,0 +1,84 @@ +package cn.stock.market.listener; + +import com.thomsonreuters.ema.access.AckMsg; +import com.thomsonreuters.ema.access.EmaFactory; +import com.thomsonreuters.ema.access.GenericMsg; +import com.thomsonreuters.ema.access.Msg; +import com.thomsonreuters.ema.access.OmmConsumer; +import com.thomsonreuters.ema.access.OmmConsumerClient; +import com.thomsonreuters.ema.access.OmmConsumerConfig; +import com.thomsonreuters.ema.access.OmmConsumerEvent; +import com.thomsonreuters.ema.access.RefreshMsg; +import com.thomsonreuters.ema.access.ReqMsg; +import com.thomsonreuters.ema.access.StatusMsg; +import com.thomsonreuters.ema.access.UpdateMsg; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +@Service +@Slf4j +public class AppClient implements OmmConsumerClient { + + private CompletableFuture messageFuture = new CompletableFuture<>(); + private OmmConsumer consumer; + + + + @Autowired + public AppClient(OmmConsumer consumer) { + this.consumer = consumer; + } + + // 根据itemName进行订阅的方法 + public void subscribe(String itemName) { + resetMessageFuture(); // 重置Future以便新的请求 + ReqMsg reqMsg = EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name(itemName); + consumer.registerClient(reqMsg, this); + } + + + public CompletableFuture getMessageFuture() { + return messageFuture; + } + + private void resetMessageFuture() { + this.messageFuture = new CompletableFuture<>(); + } + + @Override + public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) { + log.error("监听的消息:"+refreshMsg.toString()); + messageFuture.complete(refreshMsg); + } + + @Override + public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) { + + } + + @Override + public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) { + + } + + @Override + public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) { + + } + + @Override + public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) { + + } + + @Override + public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) { + + } + // 其他消息处理... +} diff --git a/src/main/java/cn/stock/market/listener/ConcurrentAppClient.java b/src/main/java/cn/stock/market/listener/ConcurrentAppClient.java new file mode 100644 index 0000000..108b4a5 --- /dev/null +++ b/src/main/java/cn/stock/market/listener/ConcurrentAppClient.java @@ -0,0 +1,75 @@ +package cn.stock.market.listener; + +import com.thomsonreuters.ema.access.AckMsg; +import com.thomsonreuters.ema.access.EmaFactory; +import com.thomsonreuters.ema.access.GenericMsg; +import com.thomsonreuters.ema.access.Msg; +import com.thomsonreuters.ema.access.OmmConsumer; +import com.thomsonreuters.ema.access.OmmConsumerClient; +import com.thomsonreuters.ema.access.OmmConsumerEvent; +import com.thomsonreuters.ema.access.RefreshMsg; +import com.thomsonreuters.ema.access.ReqMsg; +import com.thomsonreuters.ema.access.StatusMsg; +import com.thomsonreuters.ema.access.UpdateMsg; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author gs + * @date 2024/4/8 22:01 + */ +@Service +public class ConcurrentAppClient implements OmmConsumerClient { + private final ConcurrentHashMap> futures = new ConcurrentHashMap<>(); + private OmmConsumer consumer; + + @Autowired + public ConcurrentAppClient(OmmConsumer consumer) { + this.consumer = consumer; + } + + public CompletableFuture subscribe(String itemName) { + CompletableFuture future = new CompletableFuture<>(); + futures.put(itemName, future); + ReqMsg reqMsg = EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name(itemName); + consumer.registerClient(reqMsg, this); + return future; + } + + @Override + public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) { + String itemName = refreshMsg.name(); + CompletableFuture future = futures.remove(itemName); + if (future != null) { + future.complete(refreshMsg); + } + } + + @Override + public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) { + + } + + @Override + public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) { + + } + + @Override + public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) { + + } + + @Override + public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) { + + } + + @Override + public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) { + + } +} diff --git a/src/main/java/cn/stock/market/listener/EmaConfig.xml b/src/main/java/cn/stock/market/listener/EmaConfig.xml new file mode 100644 index 0000000..13c878f --- /dev/null +++ b/src/main/java/cn/stock/market/listener/EmaConfig.xmldiff --git a/src/main/java/cn/stock/market/listener/MongoConfig.java b/src/main/java/cn/stock/market/listener/MongoConfig.java new file mode 100644 index 0000000..ea57c0f --- /dev/null +++ b/src/main/java/cn/stock/market/listener/MongoConfig.java @@ -0,0 +1,25 @@ +package cn.stock.market.listener; + +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.core.MongoTemplate; + +@Configuration +public class MongoConfig { + + @Bean + public MongoClient mongoClient() { + // 连接到MongoDB服务器(示例中使用默认的MongoDB端口) + // 请根据你的实际情况修改这里的连接字符串 + return MongoClients.create("mongodb://root:123456@localhost:27017"); + } + + @Bean + public MongoTemplate mongoTemplate() { + // 创建并返回MongoTemplate的实例,需要提供MongoClient实例和数据库名 + // 替换"yourDatabase"为你的数据库名 + return new MongoTemplate(mongoClient(), "company"); + } +} diff --git a/src/main/java/cn/stock/market/listener/MyAppRunner.java b/src/main/java/cn/stock/market/listener/MyAppRunner.java new file mode 100644 index 0000000..3d0d71f --- /dev/null +++ b/src/main/java/cn/stock/market/listener/MyAppRunner.java @@ -0,0 +1,36 @@ +package cn.stock.market.listener; + +import com.thomsonreuters.ema.access.EmaFactory; +import com.thomsonreuters.ema.access.OmmConsumer; +import com.thomsonreuters.ema.access.OmmConsumerConfig; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +@Component +public class MyAppRunner implements ApplicationRunner { + static String userName = "GE-A-10288435-3-15856"; + static String password = "8EyCWDQ%XITcGRM@RhKokxX05FZJe1"; + static String clientId = "662b37b071144c9f8f2507d93037a019a010ae0e"; + @Override + public void run(ApplicationArguments args) throws Exception { + // 在这里写入您的初始化代码 + System.out.println("应用已启动,执行初始化代码..."); + + // The "Consumer_4" uses EncryptedProtocolType::RSSL_SOCKET while the "Consumer_5" uses EncryptedProtocolType::RSSL_WEBSOCKET predefined in EmaConfig.xml + } + + @Bean + public OmmConsumer getOmmConsumer() { + OmmConsumer consumer = null; + OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig(); + config.username(userName); + config.password(password); + config.clientId(clientId); + consumer = EmaFactory.createOmmConsumer(config.consumerName( "Consumer_4")); + return consumer; + } + + +} diff --git a/src/main/java/cn/stock/market/listener/StockInfoRefinitiv.java b/src/main/java/cn/stock/market/listener/StockInfoRefinitiv.java new file mode 100644 index 0000000..dbc5254 --- /dev/null +++ b/src/main/java/cn/stock/market/listener/StockInfoRefinitiv.java @@ -0,0 +1,22 @@ +package cn.stock.market.listener; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.mongodb.core.mapping.Document; + +/** + * @author gs + * @date 2024/4/8 21:00 + */ +@Document("stock_info") +@Data +@AllArgsConstructor +@NoArgsConstructor +public class StockInfoRefinitiv { + private String stockName; + private String stockCode; + private String stockType; + private String status; + private String commandStr; +} diff --git a/src/main/java/cn/stock/market/listener/SymbolRefinitiv.java b/src/main/java/cn/stock/market/listener/SymbolRefinitiv.java new file mode 100644 index 0000000..a51228a --- /dev/null +++ b/src/main/java/cn/stock/market/listener/SymbolRefinitiv.java @@ -0,0 +1,19 @@ +package cn.stock.market.listener; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +/** + * @author gs + * @date 2024/4/8 17:20 + */ +@Data +@AllArgsConstructor +@Document("symbol") +public class SymbolRefinitiv { + private String symbol; + private String stockType; + private String commandStr; +} diff --git a/src/main/java/cn/stock/market/web/MessageRetifiveController.java b/src/main/java/cn/stock/market/web/MessageRetifiveController.java new file mode 100644 index 0000000..da4a91d --- /dev/null +++ b/src/main/java/cn/stock/market/web/MessageRetifiveController.java @@ -0,0 +1,488 @@ +package cn.stock.market.web; + +import cn.stock.market.dto.RetifiveStockInfo; +import cn.stock.market.listener.AppClient; +import cn.stock.market.listener.ConcurrentAppClient; +import cn.stock.market.listener.StockInfoRefinitiv; +import cn.stock.market.listener.SymbolRefinitiv; +import cn.stock.market.utils.ServerResponse; +import com.google.common.collect.Lists; +import com.thomsonreuters.ema.access.DataType; +import com.thomsonreuters.ema.access.FieldEntry; +import com.thomsonreuters.ema.access.FieldList; +import com.thomsonreuters.ema.access.Map; +import com.thomsonreuters.ema.access.MapEntry; +import com.thomsonreuters.ema.access.RefreshMsg; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@RestController +@Slf4j +@RequestMapping("/retifive/**") +public class MessageRetifiveController { + + @Autowired + private AppClient appClient; + + @Autowired + private ConcurrentAppClient concurrentAppClient; + + @Autowired + private MongoTemplate mongoTemplate; + + @GetMapping("/message") + public ServerResponse getMessage(@RequestParam String itemName) { + RefreshMsg refreshMsg = null; + try { + appClient.subscribe(itemName); // 根据itemName订阅 + // 等待消息 + refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 + return ServerResponse.createBySuccess("操作成功",refreshMsg.toString()); + } catch (Exception e) { + return ServerResponse.createByError(); + } + } + + @GetMapping("/getStockSymbolList") + public ServerResponse getStockList() { + RefreshMsg refreshMsg = null; + List itemNameList = generateItemNames(); + String nextLink = ""; + for (String itemName : itemNameList) { + try { + appClient.subscribe(itemName); // 根据itemName订阅 + // 等待消息 + refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 + System.out.println(refreshMsg.name()); + if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){ + nextLink = decode1(refreshMsg.payload().fieldList(),refreshMsg.name()); + while (StringUtils.isNotBlank(nextLink)&&!nextLink.contains("blank data")){ + appClient.subscribe(nextLink); // 根据itemName订阅 + // 等待消息 + refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 + nextLink = decode1(refreshMsg.payload().fieldList(),refreshMsg.name()); + } + } + } catch (Exception e) { + log.error("获取列表异常link:"+nextLink,e); + return ServerResponse.createByError(); + } + } + return ServerResponse.createBySuccess("操作成功"); + } + + + /* @GetMapping("/getStockInfoList") + @Deprecated + public ServerResponse getStockInfoList() { + ExecutorService executorService = Executors.newFixedThreadPool(5); // 创建一个固定大小的线程池 + List collect = mongoTemplate.query(SymbolRefinitiv.class).stream().collect(Collectors.toList()); + List collect2 = mongoTemplate.query(StockInfoRefinitiv.class).stream().collect(Collectors.toList()); + + List difference = collect.stream() + .filter(symbol -> collect2.stream() + .noneMatch(stockInfo -> stockInfo.getCommandStr().equals(symbol.getSymbol()))) + .collect(Collectors.toList()); + + List itemNameList = difference.stream().map(SymbolRefinitiv::getSymbol).collect(Collectors.toList()); + + // 计算每个线程应处理的元素数量 + int size = itemNameList.size(); + int chunkSize = size / 5; // 假设列表可以均匀分配 + for (int i = 0; i < 5; i++) { + int start = i * chunkSize; + int end = (i == 4) ? size : (start + chunkSize); // 最后一个线程可能需要处理更多的元素 + List sublist = itemNameList.subList(start, end); + // 将子列表的处理提交给线程池 + executorService.submit(() -> { + for (String itemName : sublist) { + try { + CompletableFuture future = concurrentAppClient.subscribe(itemName); + RefreshMsg refreshMsg = future.get(10, TimeUnit.SECONDS); + if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){ + // List safeList = Collections.synchronizedList(new ArrayList<>(refreshMsg.payload().fieldList())); + decode2(refreshMsg.payload().fieldList(),refreshMsg.name()); + } + } catch (Exception e) { + log.error("获取列表异常link:"+itemName,e); + } + } + }); + } + + executorService.shutdown(); + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException ie) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + return ServerResponse.createBySuccess("操作成功"); + }*/ + + + @GetMapping("/getStockInfoList2") + public ServerResponse getStockInfoList2() { + List collect = mongoTemplate.query(SymbolRefinitiv.class).stream().map(SymbolRefinitiv::getSymbol).collect(Collectors.toList()); + + // 计算每个线程应处理的元素数量 + for (String itemName : collect) { + try { + appClient.subscribe(itemName); // 根据itemName订阅 + // 等待消息 + RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 + if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){ + // List safeList = Collections.synchronizedList(new ArrayList<>(refreshMsg.payload().fieldList())); + decode2(refreshMsg.payload().fieldList(),refreshMsg.name()); + } + } catch (Exception e) { + log.error("获取列表异常link:"+itemName,e); + } + } + + return ServerResponse.createBySuccess("操作成功"); + } + + @GetMapping("/getStockDetail") + public ServerResponse getStockInfoList2(String itemName) { + + // 计算每个线程应处理的元素数量 + try { + appClient.subscribe(itemName); // 根据itemName订阅 + // 等待消息 + RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 + if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){ + RetifiveStockInfo retifiveStockInfo = decode3(refreshMsg.payload().fieldList(), refreshMsg.name()); + return ServerResponse.createBySuccess("操作成功",retifiveStockInfo); + } + } catch (Exception e) { + log.error("获取股票详情link:"+itemName,e); + } + + return ServerResponse.createBySuccess("操作成功"); + } + + + @GetMapping("/getStockDetailList") + public ServerResponse getStockDetailList(String itemName) { + + // 计算每个线程应处理的元素数量 + String[] itemNames = itemName.split(","); + List list = Lists.newArrayList(); + for (String name : itemNames) { + try { + appClient.subscribe(name); // 根据itemName订阅 + // 等待消息 + RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 + if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){ + RetifiveStockInfo retifiveStockInfo = decode3(refreshMsg.payload().fieldList(), refreshMsg.name()); + list.add(retifiveStockInfo); + } + } catch (Exception e) { + log.error("获取股票详情link:"+name,e); + } + } + return ServerResponse.createBySuccess("操作成功",list); + + } + + + + /* void decode(FieldList fieldList) + { + for (FieldEntry fieldEntry : fieldList) + { + System.out.print("Fid: " + fieldEntry.fieldId() + " Name = " + fieldEntry.name() + " DataType: " + DataType.asString(fieldEntry.load().dataType()) + " Value: "); + + if (Data.DataCode.BLANK == fieldEntry.code()) + System.out.println(" blank"); + else + switch (fieldEntry.loadType()) + { + case DataType.DataTypes.REAL : + System.out.println(fieldEntry.real().asDouble()); + break; + case DataType.DataTypes.DATE : + System.out.println(fieldEntry.date().day() + " / " + fieldEntry.date().month() + " / " + fieldEntry.date().year()); + break; + case DataType.DataTypes.TIME : + System.out.println(fieldEntry.time().hour() + " / " + fieldEntry.time().minute() + " / " + fieldEntry.time().second() + fieldEntry.time().millisecond()); + break; + case DataType.DataTypes.INT : + System.out.println(fieldEntry.intValue()); + break; + case DataType.DataTypes.UINT : + System.out.println(fieldEntry.uintValue()); + break; + case DataType.DataTypes.ASCII : + System.out.println(fieldEntry.ascii()); + break; + case DataType.DataTypes.ENUM : + System.out.println(fieldEntry.hasEnumDisplay() ? fieldEntry.enumDisplay() : fieldEntry.enumValue()); + break; + case DataType.DataTypes.RMTES : + System.out.println(fieldEntry.rmtes()); + break; + case DataType.DataTypes.ERROR : + System.out.println("(" + fieldEntry.error().errorCodeAsString() + ")"); + break; + default : + System.out.println(); + break; + } + } + }*/ + + String decode1(FieldList fieldList,String name) + { + Iterator iter = fieldList.iterator(); + FieldEntry fieldEntry; + String nextLink = ""; + while (iter.hasNext()) + { + fieldEntry = iter.next(); + System.out.println("Fid: " + fieldEntry.fieldId() + " Name: " + fieldEntry.name() + " value: " + fieldEntry.load()); + if(fieldEntry.name().startsWith("LINK_")){ + if(fieldEntry.loadType() == DataType.DataTypes.ASCII){ + String symbol = fieldEntry.load().toString(); + if(!symbol.contains("blank data")){ + mongoTemplate.insert(new SymbolRefinitiv(symbol,"bse",name)); + } + } + } + if(fieldEntry.name().startsWith("NEXT_LR")){ + nextLink = fieldEntry.load().toString(); + if(nextLink.contains("blank data")){ + break; + } + } + + if(fieldEntry.name().startsWith("NEXT_LR")){ + nextLink = fieldEntry.load().toString(); + if(nextLink.contains("blank data")){ + break; + } + } + String stockName = ""; + if(fieldEntry.name().equals("DSPLY_NAME")){ + if(fieldEntry.loadType() == DataType.DataTypes.RMTES) { + stockName = fieldEntry.load().toString(); + } + } + String stockCode = ""; + if(fieldEntry.name().equals("PROV_SYMB")){ + if(fieldEntry.loadType() == DataType.DataTypes.RMTES) { + stockCode = fieldEntry.load().toString(); + } + } + + String status = ""; + if(fieldEntry.name().equals("TRD_STATUS")){ + if(fieldEntry.loadType() == DataType.DataTypes.RMTES) { + status = fieldEntry.load().toString(); + } + } + + StockInfoRefinitiv stockInfoRefinitiv = new StockInfoRefinitiv(); + stockInfoRefinitiv.setStockName(stockName); + stockInfoRefinitiv.setStockCode(stockCode); + stockInfoRefinitiv.setStatus(status); + stockInfoRefinitiv.setStockType("bse"); + stockInfoRefinitiv.setCommandStr(name); + } + return nextLink; + } + + synchronized void decode2(FieldList fieldList,String name) + { + List safeList = new CopyOnWriteArrayList<>(); + for(FieldEntry fieldEntry : fieldList) { + safeList.add(fieldEntry); // 假设这里是安全的,但实际上你可能需要根据FieldEntry的实现来确定 + } + String stockName = ""; + String stockCode = ""; + String status = ""; + + for(FieldEntry fieldEntry : safeList) + { + System.out.println("Fid: " + fieldEntry.fieldId() + " Name: " + fieldEntry.name() + " value: " + fieldEntry.load()); + if(fieldEntry.name().startsWith("LINK_")){ + if(fieldEntry.loadType() == DataType.DataTypes.ASCII){ + String symbol = fieldEntry.load().toString(); + if(!symbol.contains("blank data")){ + mongoTemplate.insert(new SymbolRefinitiv(symbol,"bse",name)); + } + } + } + + + if(fieldEntry.name().equals("DSPLY_NAME")){ + if(fieldEntry.loadType() == DataType.DataTypes.RMTES) { + stockName = fieldEntry.load().toString(); + } + } + if(fieldEntry.name().equals("PROV_SYMB")){ + if(fieldEntry.loadType() == DataType.DataTypes.RMTES) { + stockCode = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("TRD_STATUS")){ + if(fieldEntry.loadType() == DataType.DataTypes.RMTES) { + status = fieldEntry.load().toString(); + } + } + } + + StockInfoRefinitiv stockInfoRefinitiv = new StockInfoRefinitiv(); + stockInfoRefinitiv.setStockName(stockName); + stockInfoRefinitiv.setStockCode(stockCode); + stockInfoRefinitiv.setStatus(status); + stockInfoRefinitiv.setStockType("bse"); + stockInfoRefinitiv.setCommandStr(name); + mongoTemplate.insert(stockInfoRefinitiv); + } + + + + RetifiveStockInfo decode3(FieldList fieldList, String name) + { + Iterator iter = fieldList.iterator(); + FieldEntry fieldEntry; + String stockName = ""; + String stockCode = ""; + String status = ""; + String price = ""; + String openPrice = ""; + String previousPrice = ""; + String percentChange = ""; + String week52High =""; + String week52Low = ""; + String high = ""; + String low = ""; + String volume = "";//实时交易数量 + String stockType = ""; + while (iter.hasNext()) + { + fieldEntry = iter.next(); + System.out.println("Fid: " + fieldEntry.fieldId() + " Name: " + fieldEntry.name() +" Unit: "+DataType.asString(fieldEntry.loadType()) + " value: " + fieldEntry.load()); + if(fieldEntry.name().equals("DSPLY_NAME")){ + if(fieldEntry.loadType() == DataType.DataTypes.RMTES) { + stockName = fieldEntry.load().toString(); + } + } + if(fieldEntry.name().equals("PROV_SYMB")){ + if(fieldEntry.loadType() == DataType.DataTypes.RMTES) { + stockCode = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("INST_PHASE")){ + if(fieldEntry.loadType() == DataType.DataTypes.ENUM) { + status = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("OPEN_PRC")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + openPrice = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("HST_CLOSE")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + previousPrice = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("52WK_HIGH")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + week52High = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("52WK_LOW")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + week52Low = fieldEntry.load().toString(); + } + } + if(fieldEntry.name().equals("NETCHNG_1")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + percentChange = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("HIGH_1")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + high = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("LOW_1")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + low = fieldEntry.load().toString(); + } + } + if(fieldEntry.name().equals("IRGVOL")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + volume = fieldEntry.load().toString(); + } + } + + if(fieldEntry.name().equals("TRDPRC_1")){ + if(fieldEntry.loadType() == DataType.DataTypes.REAL) { + price = fieldEntry.load().toString(); + } + } + if(fieldEntry.name().equals("RDN_EXCHID")){ + if(fieldEntry.loadType() == DataType.DataTypes.ENUM) { + stockType = fieldEntry.load().toString(); + if(StringUtils.equals(stockType,"145")){ + stockType = "bse";//孟买国家交易所 + }else if(StringUtils.equals(stockType,"147")){ + stockType = "nse";//印度国家交易所 + } + } + } + } + RetifiveStockInfo retifiveStockInfo = RetifiveStockInfo.builder().stockCode(stockCode).stockName(stockName).symbol(name).status(status) + .openPrice(openPrice).currentPrice(price).highPrice(high).lowPrice(low).previousPrice(previousPrice).changePercent(percentChange) + .volume(volume).week52HighPrice(week52High).week52LowPrice(week52Low).stockType(stockType) + .build(); + + return retifiveStockInfo; + } + + + private static List generateItemNames() { + List itemNames = new ArrayList<>(); + + // 从字母 'A' 到 'Z' + for (char letter = 'A'; letter <= 'Z'; letter++) { + String itemName = "0#" + letter + ".BO"; + itemNames.add(itemName); + } + + return itemNames; + } +} diff --git a/src/main/resources/EmaConfig.xml b/src/main/resources/EmaConfig.xml new file mode 100644 index 0000000..13c878f --- /dev/null +++ b/src/main/resources/EmaConfig.xml