定时同步Refinitiv股票数据IN/CHANGE

This commit is contained in:
zhangjian
2024-05-11 08:46:19 +08:00
parent 7ea8b2dc07
commit 29ae06447e
8 changed files with 480 additions and 50 deletions

View File

@@ -0,0 +1,13 @@
package cn.stock.market.constant;
/**
* @author Administrator
*/
public enum InChangeDataTypeEnum {
/**
* InChangeDataTypeEnum
*/
DEL, ADD, WAS, NOW;
}

View File

@@ -0,0 +1,46 @@
package cn.stock.market.constant;
/**
* @author Administrator
*/
public enum InChangeType {
/**
* InChangeType
*/
ALL(1, "所有数据"),
TODAY(2, "今日数据"),
;
int code;
String remark;
InChangeType(int code, String remark) {
this.code = code;
this.remark = remark;
}
public int code() {
return code;
}
public String remark() {
return remark;
}
public static String remark(int code) {
InChangeType of = of(code);
return of == null ? "" : of.remark;
}
public static InChangeType of(int code) {
InChangeType[] values = InChangeType.values();
for (InChangeType e : values) {
if (e.code == code) {
return e;
}
}
return null;
}
}

View File

@@ -0,0 +1,56 @@
package cn.stock.market.constant;
/**
* @author Administrator
*/
public enum MonthEnum {
/**
* MonthEnum
*/
JAN("JAN", "01"),
FEB("FEB", "02"),
MAR("MAR", "03"),
APR("APR", "04"),
MAY("MAY", "05"),
JUN("JUN", "06"),
JUL("JUL", "07"),
AUG("AUG", "08"),
SEP("SEP", "09"),
OCT("OCT", "10"),
NOV("NOV", "11"),
DEC("DEC", "12"),
;
String code;
String remark;
MonthEnum(String code, String remark) {
this.code = code;
this.remark = remark;
}
public String code() {
return code;
}
public String remark() {
return remark;
}
public static String remark(String code) {
MonthEnum of = of(code);
return of == null ? "" : of.remark;
}
public static MonthEnum of(String code) {
MonthEnum[] values = MonthEnum.values();
for (MonthEnum e : values) {
if (e.code.equalsIgnoreCase(code)) {
return e;
}
}
return null;
}
}

View File

@@ -60,6 +60,11 @@ public class RetifiveStockPO {
* 保存时间 */
Date saveTime;
/**
* 更新时间:退市或有修改时会进行更新
*/
Date updateTime;
/**
* 是否锁定 0否 1是 */
Integer isLock;
@@ -67,4 +72,9 @@ public class RetifiveStockPO {
/**
* 是否展示 0是 1否 */
Integer isShow;
/**
* 是否退市0-否1-是
*/
Integer isDelisted;
}

View File

@@ -1,20 +1,23 @@
package cn.stock.market.infrastructure.job;
import cn.hutool.core.collection.CollUtil;
import cn.stock.market.constant.InChangeDataTypeEnum;
import cn.stock.market.constant.InChangeType;
import cn.stock.market.domain.basic.entity.RetifiveStock;
import cn.stock.market.domain.basic.service.RetifiveStockService;
import cn.stock.market.lesg.InstrumentData;
import cn.stock.market.lesg.InChangeEntity;
import cn.stock.market.lesg.RefinitivConsumer;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -24,8 +27,6 @@ import java.util.stream.Collectors;
@Component
@RestController
public class RefinitivTask {
private static final String SYMBOL_LINK_PREFIX = "#EQ.BO";
@Autowired
private RefinitivConsumer refinitivConsumer;
@Autowired
@@ -33,11 +34,12 @@ public class RefinitivTask {
@GetMapping("/task/syncStockData")
@Scheduled(cron = "0 0 18 * * ?")
@Transactional(rollbackFor = Exception.class)
public void syncStockData() {
log.info("每天18点定时同步Refinitiv股票数据开始");
Stopwatch stopwatch = Stopwatch.createStarted();
// 获取股票symbol列表
List<String> symbolList = getSymbolList();
List<String> symbolList = refinitivConsumer.getSymbolList();
// 查询出数据库中所有股票
List<RetifiveStock> allRetifiveStocks = retifiveStockService.repository().findAll();
List<String> savedSymbols = allRetifiveStocks.stream().map(RetifiveStock::getSymbol).collect(Collectors.toList());
@@ -57,41 +59,77 @@ public class RefinitivTask {
}
}
private List<String> getSymbolList() {
List<String> symbols = new ArrayList<>();
int seq = 0;
String symbolLink = seq + SYMBOL_LINK_PREFIX;
String nextLink = null;
@GetMapping("/task/syncInChange")
@Scheduled(cron = "0 0 6 * * ?")
@Transactional(rollbackFor = Exception.class)
public void syncInChange() {
log.info("每天6点定时同步Refinitiv股票数据IN/CHANGE开始");
InChangeType inChangeType = InChangeType.TODAY;
Stopwatch stopwatch = Stopwatch.createStarted();
// 获取股票symbol列表
Map<InChangeDataTypeEnum, List<InChangeEntity>> inChangeSymbolList = refinitivConsumer.getInChangeSymbolList(inChangeType);
// 过滤出要删除的数据
List<InChangeEntity> delList = inChangeSymbolList.get(InChangeDataTypeEnum.DEL);
List<String> delSymbols = delList.stream().map(InChangeEntity::getRic).collect(Collectors.toList());
// List<InChangeEntity> addList = inChangeSymbolList.get(InChangeDataTypeEnum.ADD);
// List<String> addSymbols = addList.stream().map(InChangeEntity::getRic).collect(Collectors.toList());
// 过滤出要更新的数据
List<InChangeEntity> updateList = inChangeSymbolList.get(InChangeDataTypeEnum.WAS);
List<String> wasSymbols = new ArrayList<>();
for (int i = 0; i < updateList.size(); i++) {
if (i % 2 == 0) {
wasSymbols.add(updateList.get(i).getRic());
}
}
// 查询出数据库中所有股票
List<RetifiveStock> allRetifiveStocks = retifiveStockService.repository().findAll();
List<String> savedSymbols = allRetifiveStocks.stream().map(RetifiveStock::getSymbol).collect(Collectors.toList());
// 取交集过滤出需要删除的股票
delSymbols.retainAll(savedSymbols);
// addSymbols.removeAll(savedSymbols);
// 取交集过滤出需要更新的股票
wasSymbols.retainAll(savedSymbols);
try {
InstrumentData[] dataBySymbol = refinitivConsumer.getDataBySymbol(symbolLink);
nextLink = decode(dataBySymbol[0].getDataMap(), symbols);
while (StringUtils.isNotBlank(nextLink) && !nextLink.contains("blank data")) {
dataBySymbol = refinitivConsumer.getDataBySymbol(nextLink);
nextLink = decode(dataBySymbol[0].getDataMap(), symbols);
// 执行删除操作
List<RetifiveStock> delStocks = allRetifiveStocks.stream()
.filter(item -> delSymbols.contains(item.getSymbol()))
.filter(item -> item.getIsDelisted() == 0)
.collect(Collectors.toList());
if (CollUtil.isNotEmpty(delStocks)) {
for (RetifiveStock delStock : delStocks) {
// 将股票设为退市状态
delStock.setIsDelisted(1);
delStock.setUpdateTime(new Date());
}
retifiveStockService.repository().saveAll(delStocks);
}
log.info("每天6点定时同步Refinitiv股票数据IN/CHANGE结束, 受影响删除数{}, 耗时:{}毫秒", delStocks.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
// 当类型为今日数据时才更新
List<RetifiveStock> updateStocks = allRetifiveStocks.stream()
.filter(item -> wasSymbols.contains(item.getSymbol()))
.collect(Collectors.toList());
if (CollUtil.isNotEmpty(updateStocks)) {
// 执行更新操作
for (RetifiveStock updateStock : updateStocks) {
// 定位到原来的股票数据
InChangeEntity wasEntity = updateList.stream().filter(item -> item.getRic().equals(updateStock.getSymbol())).collect(Collectors.toList()).get(0);
// 定位到要更新的股票数据
InChangeEntity nowEntity = updateList.get(updateList.indexOf(wasEntity) + 1);
// 进行更新赋值
updateStock.setStockName(nowEntity.getDescription());
updateStock.setStockCode(nowEntity.getCodeTicker());
updateStock.setSymbol(nowEntity.getRic());
updateStock.setUpdateTime(new Date());
}
retifiveStockService.repository().saveAll(updateStocks);
}
log.info("每天6点定时同步Refinitiv股票数据IN/CHANGE结束, 受影响更新数{}, 耗时:{}毫秒", updateStocks.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
log.error("每天18点定时同步Refinitiv股票数据出错数据未查出nextLink = {}", nextLink, e);
}
return symbols.stream().distinct().collect(Collectors.toList());
}
private String decode(Map<String, String> dataMap, List<String> symbols) {
String nextLink = null;
if (dataMap.get("NEXT_LR") == null) {
return nextLink;
}
for (Map.Entry<String, String> entry : dataMap.entrySet()) {
if (entry.getKey().startsWith("LINK_") && !entry.getValue().contains("blank data")) {
symbols.add(entry.getValue());
}
if (entry.getKey().startsWith("NEXT_LR")) {
nextLink = entry.getValue();
log.error("每天6点定时同步Refinitiv股票数据IN/CHANGE异常, 错误:", e);
}
}
return nextLink;
}
}

View File

@@ -0,0 +1,22 @@
package cn.stock.market.lesg;
import lombok.Builder;
import lombok.Data;
/**
* InChangeEntity
*
* @author jnerh
* @since 2024/05/09 16:39
*/
@Data
@Builder
public class InChangeEntity {
private String date;
private String description;
private String typ;
private String ric;
private String isin;
private String codeTicker;
private String cls;
}

View File

@@ -5,15 +5,16 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import cn.stock.market.constant.InChangeDataTypeEnum;
import cn.stock.market.constant.InChangeType;
import cn.stock.market.domain.basic.entity.RetifiveStock;
import cn.stock.market.dto.RetifiveStockHistoryResponse;
import cn.stock.market.dto.RetifiveStockInfo;
import cn.stock.market.dto.StockHistoryResponse;
import cn.stock.market.utils.DateTimeUtil;
import cn.stock.market.utils.RefinitivUtil;
import com.ag.exception.SysTipsException;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.refinitiv.ema.access.*;
import com.refinitiv.ema.rdm.EmaRdm;
import lombok.SneakyThrows;
@@ -26,18 +27,15 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.*;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.stream.Collectors;
/**
* RefinitivConsumer
@@ -48,6 +46,17 @@ import static java.util.concurrent.TimeUnit.SECONDS;
@Service
@Slf4j
public class RefinitivConsumer implements ApplicationRunner {
/**
* 股票的IN/CHANGE范围为 2-40
*/
private static final int SYMBOL_CHANGE_MAX_SIZE = 40;
private static final String SYMBOL_CHANGE_PREFIX = "IN/CHANGE";
private static final String SYMBOL_LINK_PREFIX = "#EQ.BO";
@Resource
private RedisTemplate redisTemplate;
@@ -106,7 +115,6 @@ public class RefinitivConsumer implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("Initialize the consumer and connect to market data system....");
initialize();
}
@@ -242,6 +250,142 @@ public class RefinitivConsumer implements ApplicationRunner {
return list;
}
/**
* 获取所有股票的symbol
*
* @return 结果
*/
public List<String> getSymbolList() {
List<String> symbols = new ArrayList<>();
int seq = 0;
String symbolLink = seq + SYMBOL_LINK_PREFIX;
String nextLink = null;
try {
InstrumentData[] dataBySymbol = this.getDataBySymbol(symbolLink);
nextLink = RefinitivUtil.decode(dataBySymbol[0].getDataMap(), symbols);
while (StringUtils.isNotBlank(nextLink) && !nextLink.contains("blank data")) {
dataBySymbol = this.getDataBySymbol(nextLink);
nextLink = RefinitivUtil.decode(dataBySymbol[0].getDataMap(), symbols);
}
} catch (Exception e) {
log.error("每天18点定时同步Refinitiv股票数据出错数据未查出nextLink = {}", nextLink, e);
}
return symbols;
}
/**
* 判断是否可以获取下一页数据
*
* 如果是要查今天的数据需要先change2
* 1 如果change2中包含今天的数据
* 1判断最后一条是否为今天的数据
* a如果是今天的数据需要再查change3依次类推
* b) 如果不是则停止查询,直接取出所有的今天的数据
* 2 不包含今天的数据
* 1) 判断最后一条是否为今天之后的数据
* a) 是需要再查change3依次类推
* b) 不是,则停止查询,今天没有需要的数据
*
* @param list InChange数据
* @return 结果
*/
private boolean canGoNext(List<InChangeEntity> list) {
// 获取当天的数据
List<InChangeEntity> todayData = list.stream().filter(item -> item.getDate().equals(LocalDate.now().toString())).collect(Collectors.toList());
// 获取到最后一条数据
InChangeEntity lastEntity = list.get(list.size() - 1);
if (CollUtil.isNotEmpty(todayData)) {
// 如果list中包含今天的数据则判断最后一条是否为今天的数据
return LocalDate.parse(lastEntity.getDate()).isEqual(LocalDate.now());
} else {
// 如果list中不包含今天的数据则判断最后一条是否为今天之后的数据
return LocalDate.parse(lastEntity.getDate()).isAfter(LocalDate.now());
}
}
/**
* 获取要操作的股票
*
* @param inChangeType 类型
* @return 结果
*/
public Map<InChangeDataTypeEnum, List<InChangeEntity>> getInChangeSymbolList(InChangeType inChangeType) {
List<InChangeEntity> allInChangeData = queryInChangeData(inChangeType);
return assembleDataMap(allInChangeData);
}
/**
* 获取IN/CHANGE数据
*
* @param inChangeType 类型
* @return 结果
*/
private List<InChangeEntity> queryInChangeData(InChangeType inChangeType) {
List<InChangeEntity> allInChangeData = new ArrayList<>();
for (int i = 2; i <= SYMBOL_CHANGE_MAX_SIZE; i++) {
String symbol = SYMBOL_CHANGE_PREFIX + i;
try {
InstrumentData[] dataBySymbol = this.getDataBySymbol(symbol);
InstrumentData instrumentData = dataBySymbol[0];
if (isInvalidResult(instrumentData)) {
continue;
}
List<InChangeEntity> list = RefinitivUtil.decodeInChangeData(instrumentData);
allInChangeData.addAll(list);
if (inChangeType == InChangeType.TODAY) {
if (!canGoNext(list)) {
break;
}
}
} catch (Exception e) {
log.error("每天18点定时同步Refinitiv股票数据IN/CHANGE出错symbol = {}", symbol, e);
}
}
if (inChangeType == InChangeType.TODAY) {
// 过滤出今天的数据
return allInChangeData.stream().filter(item -> LocalDate.parse(item.getDate()).isEqual(LocalDate.now())).collect(Collectors.toList());
} else {
// 过滤出不晚于今天的数据
return allInChangeData.stream().filter(item -> !LocalDate.parse(item.getDate()).isAfter(LocalDate.now())).collect(Collectors.toList());
}
}
/**
* 组装要操作的数据
*
* @param allInChangeData 要操作的数据
* @return 结果
*/
private Map<InChangeDataTypeEnum, List<InChangeEntity>> assembleDataMap(List<InChangeEntity> allInChangeData) {
Map<InChangeDataTypeEnum, List<InChangeEntity>> map = new HashMap<>(100);
List<InChangeEntity> delList = new ArrayList<>();
// List<InChangeEntity> addList = new ArrayList<>();
List<InChangeEntity> updateList = new ArrayList<>();
for (int i = 0; i < allInChangeData.size(); i++) {
InChangeEntity inChange = allInChangeData.get(i);
if (InChangeDataTypeEnum.DEL.name().equals(inChange.getTyp())) {
delList.add(inChange);
}
// if (InChangeDataTypeEnum.ADD.name().equals(inChange.getTyp())) {
// addList.add(inChange);
// }
if (InChangeDataTypeEnum.WAS.name().equals(inChange.getTyp()) && i < allInChangeData.size() - 1) {
updateList.addAll(Lists.newArrayList(allInChangeData.get(i + 1), inChange));
}
}
Collections.reverse(updateList);
map.put(InChangeDataTypeEnum.DEL, delList);
// map.put(InChangeDataTypeEnum.ADD, addList);
map.put(InChangeDataTypeEnum.WAS, updateList);
return map;
}
/**
* 获取多个推荐股票详情
*

View File

@@ -1,13 +1,12 @@
package cn.stock.market.utils;
import cn.hutool.core.util.StrUtil;
import cn.stock.market.constant.MonthEnum;
import cn.stock.market.domain.basic.entity.RetifiveStock;
import cn.stock.market.dto.RetifiveStockInfo;
import cn.stock.market.lesg.InChangeEntity;
import cn.stock.market.lesg.InstrumentData;
import com.google.common.collect.Lists;
import com.refinitiv.ema.access.DataType;
import com.refinitiv.ema.access.FieldEntry;
import com.refinitiv.ema.access.FieldList;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.ReflectionUtils;
@@ -29,6 +28,8 @@ public class RefinitivUtil {
public static final String BLANK_DATA = "(blank data)";
public static final List<String> STOCK_CODE_FIELDS = Lists.newArrayList("EXCHCODE", "PROV_SYMB", "MNEMONIC", "SRC_SYMB");
public static final String BSE_SYMBOL_SUFFIX = ".BO";
private static RetifiveStockInfo handleData(RetifiveStockInfo retifiveStockInfo) {
Field[] fields = retifiveStockInfo.getClass().getDeclaredFields();
Arrays.stream(fields).forEach(field -> {
@@ -203,6 +204,75 @@ public class RefinitivUtil {
return result;
}
public static List<InChangeEntity> decodeInChangeData(InstrumentData instrumentData) {
List<String> list = new ArrayList<>(100);
instrumentData.getDataMap().forEach((k, v) -> {
if (StrUtil.contains(k, "ROW80_") && !StrUtil.containsAny(v, "IN/CHANGE")) {
list.add(k);
}
});
List<String> inChangeSymbolList = list.stream().sorted(inChangeComparator()).collect(Collectors.toList());
List<String> result = new ArrayList<>();
for (String key : inChangeSymbolList) {
result.add(instrumentData.getDataMap().get(key));
}
String flagStr = result.remove(0);
int dateBeginIdx = 0;
int dateEndIdx = flagStr.indexOf("DESCRIPTION") - 1;
int descriptionBeginIdx = flagStr.indexOf("DESCRIPTION");
int descriptionEndIdx = flagStr.indexOf("TYP") - 1;
int typBeginIdx = flagStr.indexOf("TYP");
int typEndIdx = flagStr.indexOf("RIC") - 1;
int ricBeginIdx = flagStr.indexOf("RIC");
int ricEndIdx = flagStr.indexOf("ISIN") - 1;
int isinBeginIdx = flagStr.indexOf("ISIN");
int isinEndIdx = flagStr.indexOf("CODE") - 1;
int codeBeginIdx = flagStr.indexOf("CODE");
int codeEndIdx = flagStr.indexOf("CLS") - 1;
int clsBeginIdx = flagStr.indexOf("CLS");
List<InChangeEntity> inChangeEntityList = new ArrayList<>();
for (String data : result) {
String date = data.substring(dateBeginIdx, dateEndIdx).trim();
String description = data.substring(descriptionBeginIdx, descriptionEndIdx).trim();
String typ = data.substring(typBeginIdx, typEndIdx).trim();
String ric = parseInChangeRic(data.substring(ricBeginIdx, ricEndIdx).trim());
if (!ric.endsWith(BSE_SYMBOL_SUFFIX)) {
// 暂无相关权限处理非孟买国家交易所的股票
continue;
}
String isin = data.substring(isinBeginIdx, isinEndIdx).trim();
String codeTicker = data.substring(codeBeginIdx, codeEndIdx).trim();
String cls = data.substring(clsBeginIdx).trim();
InChangeEntity inChangeEntity = InChangeEntity.builder()
.date(parseInChangeDate(date))
.description(description)
.typ(typ)
.ric(ric)
.isin(isin)
.codeTicker(codeTicker)
.cls(cls)
.build();
inChangeEntityList.add(inChangeEntity);
}
return inChangeEntityList;
}
private static String parseInChangeDate(String date) {
String day = date.substring(0, 2);
String month = date.substring(2, 5);
String year = date.substring(5);
return String.format("20%s-%s-%s", year, Objects.requireNonNull(MonthEnum.of(month)).remark(), day);
}
private static String parseInChangeRic(String ric) {
return StrUtil.replace(ric, "<", "")
.replace(">", "");
}
private static Comparator<String> topComparator() {
String flag = "BR_LINK";
return (s1, s2) -> {
@@ -213,4 +283,35 @@ public class RefinitivUtil {
return seq1 - seq2;
};
}
private static Comparator<String> inChangeComparator() {
String flag = "ROW80_";
return (s1, s2) -> {
String seqStr1 = s1.substring(flag.indexOf(s1) + flag.length() + 1);
String seqStr2 = s2.substring(flag.indexOf(s2) + flag.length() + 1);
int seq1 = Integer.parseInt(seqStr1);
int seq2 = Integer.parseInt(seqStr2);
return seq1 - seq2;
};
}
public static String decode(Map<String, String> dataMap, List<String> symbols) {
String nextLink = null;
if (dataMap.get("NEXT_LR") == null) {
return nextLink;
}
for (Map.Entry<String, String> entry : dataMap.entrySet()) {
if (entry.getKey().startsWith("LINK_") && !entry.getValue().contains("blank data")) {
symbols.add(entry.getValue());
}
if (entry.getKey().startsWith("NEXT_LR")) {
nextLink = entry.getValue();
}
}
return nextLink;
}
}