diff --git a/src/main/java/cn/stock/market/infrastructure/job/RefinitivTask.java b/src/main/java/cn/stock/market/infrastructure/job/RefinitivTask.java new file mode 100644 index 0000000..e008005 --- /dev/null +++ b/src/main/java/cn/stock/market/infrastructure/job/RefinitivTask.java @@ -0,0 +1,108 @@ +package cn.stock.market.infrastructure.job; + +import cn.hutool.core.collection.CollUtil; +import cn.stock.market.domain.basic.entity.RetifiveStock; +import cn.stock.market.domain.basic.service.RetifiveStockService; +import cn.stock.market.lesg.InstrumentData; +import cn.stock.market.lesg.RefinitivConsumer; +import com.google.common.base.Stopwatch; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Slf4j +@Component +@RestController +public class RefinitivTask { + + @Autowired + private RefinitivConsumer refinitivConsumer; + @Autowired + private RetifiveStockService retifiveStockService; + + @GetMapping("/task/syncStockData") + @Scheduled(cron = "0 0 18 * * ?") + public void syncStockData() { + log.info("每天18点定时同步Refinitiv股票数据开始"); + Stopwatch stopwatch = Stopwatch.createStarted(); + // 获取股票symbol列表 + List symbolList = getSymbolList(); + // 查询出数据库中所有股票 + List allRetifiveStocks = retifiveStockService.repository().findAll(); + List savedSymbols = allRetifiveStocks.stream().map(RetifiveStock::getSymbol).collect(Collectors.toList()); + // 去除数据库中已经存在的股票 + symbolList.removeAll(savedSymbols); + if (CollUtil.isEmpty(symbolList)) { + log.error("定时同步Refinitiv股票数据获取symbol为空"); + return; + } + + try { + List stockList = refinitivConsumer.getRefinitivStockList(String.join(",", symbolList)); + retifiveStockService.repository().saveAll(stockList); + log.info("每天18点定时同步Refinitiv股票数据结束, 受影响数{}, 耗时:{}毫秒", symbolList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } catch (Exception e) { + log.error("每天18点定时同步Refinitiv股票数据异常, 错误:", e); + } + } + + private List getSymbolList() { + List symbols = new ArrayList<>(); + List itemNameList = generateItemNames(); + for (String itemName : itemNameList) { + try { + // 等待消息 + InstrumentData[] dataBySymbol = refinitivConsumer.getDataBySymbol(itemName); + String nextLink = decode(dataBySymbol[0].getDataMap(), symbols); + while (StringUtils.isNotBlank(nextLink) && !nextLink.contains("blank data")) { + dataBySymbol = refinitivConsumer.getDataBySymbol(nextLink); + nextLink = decode(dataBySymbol[0].getDataMap(), symbols); + } + } catch (Exception e) { + log.error("每天18点定时同步Refinitiv股票数据出错,数据未查出,symbol = {}", itemName, e); + } + } + + return symbols; + } + + private String decode(Map dataMap, List symbols) { + String nextLink = null; + for (Map.Entry entry : dataMap.entrySet()) { + if (entry.getKey().startsWith("LINK_") && !entry.getValue().contains("blank data")) { + symbols.add(entry.getValue()); + } + + if (entry.getKey().startsWith("NEXT_LR")) { + nextLink = entry.getValue(); + if (nextLink.contains("blank data")) { + break; + } + } + } + + return nextLink; + } + + private static List generateItemNames() { + List itemNames = new ArrayList<>(); + + // 从字母 'A' 到 'Z' + for (char letter = 'A'; letter <= 'Z'; letter++) { + String itemName = "0#" + letter + ".BO"; + itemNames.add(itemName); + } + + return itemNames; + } +} diff --git a/src/main/java/cn/stock/market/lesg/RefinitivConsumer.java b/src/main/java/cn/stock/market/lesg/RefinitivConsumer.java index 817088f..4d050ea 100644 --- a/src/main/java/cn/stock/market/lesg/RefinitivConsumer.java +++ b/src/main/java/cn/stock/market/lesg/RefinitivConsumer.java @@ -2,6 +2,7 @@ package cn.stock.market.lesg; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; +import cn.stock.market.domain.basic.entity.RetifiveStock; import cn.stock.market.dto.RetifiveStockInfo; import cn.stock.market.utils.RefinitivUtil; import com.refinitiv.ema.access.*; @@ -135,6 +136,26 @@ public class RefinitivConsumer implements ApplicationRunner { return list; } + /** + * 获取多个股票详情 + * + * @param symbol ric + * @return 结果 + * @throws Exception e + */ + public List getRefinitivStockList(String symbol) throws Exception { + List list = new ArrayList<>(); + InstrumentData[] result = getDataBySymbol(symbol); + for (InstrumentData instrumentData : result) { + if (!isInvalidResult(instrumentData)) { + RetifiveStock retifiveStock = RefinitivUtil.decodeRefinitivStock(instrumentData); + list.add(retifiveStock); + } + } + + return list; + } + /** * 获取多个推荐股票详情 * @@ -143,19 +164,13 @@ public class RefinitivConsumer implements ApplicationRunner { * @throws Exception e */ public List getTopStockList(String symbol) throws Exception { - List list = new ArrayList<>(); InstrumentData[] result = getDataBySymbol(symbol); if (result.length == 0 || isInvalidResult(result[0])) { - return list; + return new ArrayList<>(); } List topSymbols = RefinitivUtil.decodeTopData(result[0]); - for (String topSymbol : topSymbols) { - RetifiveStockInfo stockInfo = getDetail(topSymbol); - list.add(stockInfo); - } - - return list; + return getStockList(String.join(",", topSymbols)); } /** diff --git a/src/main/java/cn/stock/market/utils/RefinitivUtil.java b/src/main/java/cn/stock/market/utils/RefinitivUtil.java index f230916..d712697 100644 --- a/src/main/java/cn/stock/market/utils/RefinitivUtil.java +++ b/src/main/java/cn/stock/market/utils/RefinitivUtil.java @@ -1,6 +1,7 @@ package cn.stock.market.utils; import cn.hutool.core.util.StrUtil; +import cn.stock.market.domain.basic.entity.RetifiveStock; import cn.stock.market.dto.RetifiveStockInfo; import cn.stock.market.lesg.InstrumentData; import com.google.common.collect.Lists; @@ -367,6 +368,39 @@ public class RefinitivUtil { return handleData(stockInfo); } + + /** + * 解析为RetifiveStock + * + * @param data 查出的数据 + * @return 结果 + */ + public static RetifiveStock decodeRefinitivStock(InstrumentData data) { + Map dataMap = data.getDataMap(); + RetifiveStock stock= RetifiveStock.builder().symbol(data.getRic()).saveTime(new Date()).build(); + dataMap.forEach((k, v) -> { + if (k.equals("DSPLY_NAME")) { + stock.setStockName(v); + } + if (k.equals("PROV_SYMB")) { + stock.setStockCode(v); + } + if (k.equals("RDN_EXCHID")) { + String stockType = v; + if (StrUtil.equals(stockType, "145")) { + //孟买国家交易所 + stockType = "bse"; + } else if (StrUtil.equals(stockType, "147")) { + //印度国家交易所 + stockType = "nse"; + } + stock.setStockType(stockType); + } + }); + + return stock; + } + public static List decodeTopData(InstrumentData instrumentData) { List list = new ArrayList<>(100); instrumentData.getDataMap().forEach((k, v) -> {