更換refinitiv获取股票数据底层实现

This commit is contained in:
zhangjian
2024-04-29 11:50:58 +08:00
parent 83e518b3f7
commit b73bd18289
8 changed files with 487 additions and 106 deletions

View File

@@ -14,6 +14,7 @@
<maven.compile.source>1.8</maven.compile.source>
<maven.compile.target>1.8</maven.compile.target>
<maven.build.timestamp.format>MMdd-HHmm</maven.build.timestamp.format>
<rtsdk.version>3.6.5.0</rtsdk.version>
</properties>
<groupId>cn.stock</groupId>
<artifactId>market</artifactId>
@@ -195,7 +196,11 @@
<artifactId>ema</artifactId>
<version>3.5.1.0</version>
</dependency>
<dependency>
<groupId>com.refinitiv.ema</groupId>
<artifactId>ema</artifactId>
<version>${rtsdk.version}</version>
</dependency>
</dependencies>

View File

@@ -0,0 +1,75 @@
package cn.stock.market.lesg;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author Administrator
*/
public class Batch {
private static int id = 0;
private int thisID = 0;
private long _startTimeStamp = 0, _endTimeStamp = 0;
private Map<String, InstrumentData> batchInstruments;
private CountDownLatch countDownLatch;
private long timeout = 0;
Batch(String[] rics, long timeout) {
thisID = id++;
this.timeout = timeout;
batchInstruments = new HashMap<String, InstrumentData>(rics.length);
for(String ric : rics) {
batchInstruments.put(ric, new InstrumentData(ric));
}
countDownLatch = new CountDownLatch(batchInstruments.size());
}
public String[] getAllRics() {
return batchInstruments.keySet().toArray(new String[batchInstruments.size()]);
}
public InstrumentData getInstrument(String ric) throws Exception {
if(!batchInstruments.containsKey(ric)) {
throw new Exception("RIC not found in batch: " + ric);
}
return batchInstruments.get(ric);
}
public InstrumentData[] getAllInstruments() {
return batchInstruments.values().toArray(new InstrumentData[0]);
}
public void recordStartTime() {
_startTimeStamp = System.currentTimeMillis();
_endTimeStamp = 0;
}
public void recordEndTime() {
_endTimeStamp = System.currentTimeMillis();
}
public boolean await() throws Exception {
return countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
}
public void countDown() {
countDownLatch.countDown();
}
public long batchFulfilmentTime() {
return _endTimeStamp - _startTimeStamp;
}
public int getBatchId() {
return thisID;
}
}

View File

@@ -0,0 +1,34 @@
package cn.stock.market.lesg;
import com.refinitiv.ema.access.*;
import lombok.Data;
import java.util.HashMap;
@Data
public class InstrumentData {
public String ric;
public String dataState = "";
public HashMap<String, String> dataMap = new HashMap<>();
public InstrumentData(String ric) {
this.ric = ric;
}
public void setState(OmmState state) {
dataState = state.dataStateAsString() + "|" +
state.statusCodeAsString() + "|" +
state.statusText() + "|" +
state.streamStateAsString();
}
public void decode(FieldList fieldList) {
for(FieldEntry fieldEntry : fieldList)
dataMap.put(fieldEntry.name(), fieldEntry.load().toString());
}
}

View File

@@ -0,0 +1,55 @@
package cn.stock.market.lesg;
import cn.stock.market.listener.AppClient;
import com.refinitiv.ema.access.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class RefinitivAppClient implements OmmConsumerClient {
private static final Logger LOG = LoggerFactory.getLogger(AppClient.class);
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
try {
Batch bRequest = (Batch) event.closure();
InstrumentData instr = bRequest.getInstrument(refreshMsg.name());
instr.setState(refreshMsg.state());
if(DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) {
instr.decode(refreshMsg.payload().fieldList());
}
bRequest.countDown();
}
catch(Exception e) {
LOG.error("Exception processing Refresh callback");
LOG.error("Message: ", refreshMsg);
LOG.error("Exception: ", e);
}
}
public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) {
if(statusMsg.hasName()) {
try {
Batch bRequest = (Batch) event.closure();
InstrumentData instr = bRequest.getInstrument(statusMsg.name());
instr.setState(statusMsg.state());
bRequest.countDown();
}
catch(Exception e) {
LOG.error("Exception processing Status callback");
LOG.error("Message: ", statusMsg);
LOG.error("Exception: ", e);
}
}
}
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent event) {}
public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {}
public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {}
public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {}
}

View File

@@ -0,0 +1,202 @@
package cn.stock.market.lesg;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.stock.market.dto.RetifiveStockInfo;
import cn.stock.market.utils.RefinitivUtil;
import com.refinitiv.ema.access.*;
import com.refinitiv.ema.rdm.EmaRdm;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* RefinitivConsumer
*
* @author jnerh
* @since 2024/04/29 10:37
*/
@Service
@Slf4j
public class RefinitivConsumer implements ApplicationRunner {
@Value("${refinitiv.market-data.service-name}")
private String serviceName;
@Value("${refinitiv.market-data.username}")
private String username;
@Value("${refinitiv.market-data.password}")
private String password;
@Value("${refinitiv.market-data.clientId}")
private String clientId;
@Value("${refinitiv.market-data.batch-request-timeout}")
private long timeout;
private OmmConsumer consumer = null;
public static final String TOP_ACTIVES_SYMBOL = ".AV.BO";
public static final String TOP_GAINERS_SYMBOL = ".PG.BO";
public static final String TOP_LOSERS_SYMBOL = ".PL.BO";
public InstrumentData[] getDataBySymbol(String symbol) throws Exception {
String[] items = symbol.split(",");
log.info("Quote request for: {}", java.util.Arrays.toString(items));
// create a batch request
Batch btc = new Batch(items, timeout);
// price it
this.synchronousRequest(btc);
// send json array response
return btc.getAllInstruments();
}
/**
* 获取股票详情
*
* @param symbol ric
* @return 结果
* @throws Exception e
*/
public RetifiveStockInfo getDetail(String symbol) throws Exception {
InstrumentData[] result = getDataBySymbol(symbol);
InstrumentData instrumentData = result[0];
// 判断数据有效性
if (isInvalidResult(instrumentData)) {
return null;
}
return RefinitivUtil.decode(instrumentData);
}
/**
* 获取股票推荐TopActives
*
* @return 结果
* @throws Exception e
*/
public List<RetifiveStockInfo> getTopActives() throws Exception {
return getTopStockList(TOP_ACTIVES_SYMBOL);
}
/**
* 获取股票推荐TopGainers
*
* @return 结果
* @throws Exception e
*/
public List<RetifiveStockInfo> getTopGainers() throws Exception {
return getTopStockList(TOP_GAINERS_SYMBOL);
}
/**
* 获取股票推荐TopLosers
*
* @return 结果
* @throws Exception e
*/
public List<RetifiveStockInfo> getTopLosers() throws Exception {
return getTopStockList(TOP_LOSERS_SYMBOL);
}
/**
* 获取多个股票详情
*
* @param symbol ric
* @return 结果
* @throws Exception e
*/
public List<RetifiveStockInfo> getDetailList(String symbol) throws Exception {
return getStockList(symbol);
}
/**
* 获取多个股票详情
*
* @param symbol ric
* @return 结果
* @throws Exception e
*/
public List<RetifiveStockInfo> getStockList(String symbol) throws Exception {
List<RetifiveStockInfo> list = new ArrayList<>();
InstrumentData[] result = getDataBySymbol(symbol);
for (InstrumentData instrumentData : result) {
if (!isInvalidResult(instrumentData)) {
RetifiveStockInfo retifiveStockInfo = RefinitivUtil.decode(instrumentData);
list.add(retifiveStockInfo);
}
}
return list;
}
/**
* 获取多个推荐股票详情
*
* @param symbol ric
* @return 结果
* @throws Exception e
*/
public List<RetifiveStockInfo> getTopStockList(String symbol) throws Exception {
List<RetifiveStockInfo> list = new ArrayList<>();
InstrumentData[] result = getDataBySymbol(symbol);
if (result.length == 0 || isInvalidResult(result[0])) {
return list;
}
List<String> topSymbols = RefinitivUtil.decodeTopData(result[0]);
for (String topSymbol : topSymbols) {
RetifiveStockInfo stockInfo = getDetail(topSymbol);
list.add(stockInfo);
}
return list;
}
/**
* 校验数据有效性
*
* @param instrumentData 数据
* @return 结果
*/
private boolean isInvalidResult(InstrumentData instrumentData) {
String dataState = instrumentData.getDataState();
return CollUtil.isEmpty(instrumentData.getDataMap()) ||
StrUtil.containsAny(dataState, "Suspect", "NotFound", "**The record could not be found", "CLOSED");
}
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("Initialize the consumer and connect to market data system....");
OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();
config.username(username);
config.password(password);
config.clientId(clientId);
consumer = EmaFactory.createOmmConsumer(config.consumerName("Consumer_4"));
}
public void synchronousRequest(Batch bRequest) throws Exception {
ElementList eList = EmaFactory.createElementList();
OmmArray array = EmaFactory.createOmmArray();
for (String instr : bRequest.getAllRics()) {
array.add(EmaFactory.createOmmArrayEntry().ascii(instr));
}
eList.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array));
consumer.registerClient(EmaFactory.createReqMsg().serviceName(serviceName).payload(eList).interestAfterRefresh(false), new RefinitivAppClient(), bRequest);
// wait for batch to be fulfilled
bRequest.await();
}
public void disconnect() {
if (consumer != null) {
consumer.uninitialize();
}
}
}

View File

@@ -1,6 +1,8 @@
package cn.stock.market.utils;
import cn.hutool.core.util.StrUtil;
import cn.stock.market.dto.RetifiveStockInfo;
import cn.stock.market.lesg.InstrumentData;
import com.google.common.collect.Lists;
import com.thomsonreuters.ema.access.DataType;
import com.thomsonreuters.ema.access.FieldEntry;
@@ -13,6 +15,7 @@ import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* RefinitivUtil
@@ -304,4 +307,78 @@ public class RefinitivUtil {
}
return retifiveStockInfo;
}
public static RetifiveStockInfo decode(InstrumentData data) {
Map<String, String> dataMap = data.getDataMap();
RetifiveStockInfo stockInfo = RetifiveStockInfo.builder().symbol(data.getRic()).build();
dataMap.forEach((k, v) -> {
if (k.equals("DSPLY_NAME")) {
stockInfo.setStockName(v);
}
if (k.equals("PROV_SYMB")) {
stockInfo.setStockCode(v);
}
if (k.equals("INST_PHASE")) {
stockInfo.setStatus(v);
}
if (k.equals("OPEN_PRC")) {
stockInfo.setOpenPrice(v);
}
if (k.equals("HST_CLOSE")) {
stockInfo.setPreviousPrice(v);
}
if (k.equals("52WK_HIGH")) {
stockInfo.setWeek52HighPrice(v);
}
if (k.equals("52WK_LOW")) {
stockInfo.setWeek52LowPrice(v);
}
if (k.equals("PCTCHNG")) {
stockInfo.setPerchg(v);
}
if (k.equals("NETCHNG_1")) {
stockInfo.setChange(v);
}
if (k.equals("HIGH_1")) {
stockInfo.setHighPrice(v);
}
if (k.equals("LOW_1")) {
stockInfo.setLowPrice(v);
}
if (k.equals("IRGVOL")) {
stockInfo.setVolume(v);
}
if (k.equals("TRDPRC_1")) {
stockInfo.setLastPrice(v);
}
if (k.equals("RDN_EXCHID")) {
String stockType = v;
if (StrUtil.equals(stockType, "145")) {
//孟买国家交易所
stockType = "bse";
} else if (StrUtil.equals(stockType, "147")) {
//印度国家交易所
stockType = "nse";
}
stockInfo.setStockType(stockType);
}
});
return handleData(stockInfo);
}
public static List<String> decodeTopData(InstrumentData instrumentData) {
List<String> list = Lists.newArrayList();
instrumentData.getDataMap().forEach((k, v) -> {
if (k.startsWith("BR_LINK")) {
if (!v.contains("blank data")) {
list.add(v);
}
}
});
return list;
}
}

View File

@@ -1,17 +1,13 @@
package cn.stock.market.web;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.stock.market.domain.basic.entity.RetifiveStock;
import cn.stock.market.domain.basic.service.RetifiveStockService;
import cn.stock.market.dto.RetifiveStockInfo;
import cn.stock.market.lesg.RefinitivConsumer;
import cn.stock.market.listener.AppClient;
import cn.stock.market.utils.RefinitivUtil;
import cn.stock.market.utils.ServerResponse;
import cn.stock.market.web.annotations.EncryptFilter;
import com.google.common.collect.Lists;
import com.thomsonreuters.ema.access.DataType;
import com.thomsonreuters.ema.access.RefreshMsg;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
@@ -26,7 +22,8 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static cn.stock.market.lesg.RefinitivConsumer.*;
/**
* @author Administrator
@@ -37,6 +34,8 @@ import java.util.concurrent.TimeUnit;
@RequestMapping({"/api/market/refinitiv", "/api/hq/refinitiv"})
public class RefinitivApiController {
@Resource
private RefinitivConsumer refinitivConsumer;
@Resource
private AppClient appClient;
@Autowired
@@ -48,18 +47,13 @@ public class RefinitivApiController {
@EncryptFilter(decryptRequest = false)
public ServerResponse<?> getStockDetail(String symbol) {
try {
appClient.subscribe(symbol);
// 等待消息, 设置超时时间例如10秒
RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) {
RetifiveStockInfo retifiveStockInfo = RefinitivUtil.decodeData(refreshMsg.payload().fieldList(), refreshMsg.name());
//获取股票id
RetifiveStock stock = retifiveStockService.repository().findBtStockByCoCode(symbol);
if(stock != null){
retifiveStockInfo.setId(stock.getId());
}
return ServerResponse.createBySuccess("操作成功", retifiveStockInfo);
RetifiveStockInfo retifiveStockInfo = refinitivConsumer.getDetail(symbol);
//获取股票id
RetifiveStock stock = retifiveStockService.repository().findBtStockByCoCode(symbol);
if(stock != null){
retifiveStockInfo.setId(stock.getId());
}
return ServerResponse.createBySuccess("操作成功", retifiveStockInfo);
} catch (Exception e) {
log.error("获取股票详情link:" + symbol, e);
} finally {
@@ -75,27 +69,13 @@ public class RefinitivApiController {
@GetMapping("/getStockDetailList")
@EncryptFilter(decryptRequest = false)
public ServerResponse<?> getStockDetailList(String symbol) {
// 计算每个线程应处理的元素数量
String[] itemNames = symbol.split(",");
List<RetifiveStockInfo> list = Lists.newArrayList();
for (String name : itemNames) {
try {
appClient.subscribe(name);
// 等待消息, 设置超时时间例如10秒
RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) {
RetifiveStockInfo retifiveStockInfo = RefinitivUtil.decodeData(refreshMsg.payload().fieldList(), refreshMsg.name());
list.add(retifiveStockInfo);
}
} catch (Exception e) {
log.error("获取股票详情link:" + name, e);
} finally {
// 可能需要重置或清理资源
appClient.resetMessageFuture();
}
try {
return ServerResponse.createBySuccess("操作成功", refinitivConsumer.getDetailList(symbol));
} catch (Exception e) {
log.error("获取股票详情列表报错,", e);
}
return ServerResponse.createBySuccess("操作成功", list);
return ServerResponse.createByError();
}
@ApiOperation(value = "获取股票推荐TopActives", httpMethod = "GET", response = RetifiveStockInfo.class)
@@ -106,33 +86,14 @@ public class RefinitivApiController {
return ServerResponse.createBySuccess("操作成功", new ArrayList<>());
}
String name = ".AV.BO";
List<RetifiveStockInfo> list = Lists.newArrayList();
try {
appClient.subscribe(name); // 根据itemName订阅
// 等待消息
RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间例如10秒
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) {
List<String> strings = RefinitivUtil.decode(refreshMsg.payload().fieldList());
if (CollUtil.isEmpty(strings)) {
return ServerResponse.createBySuccess("操作成功", list);
}
appClient.subscribeList(strings); // 根据itemName订阅
// 等待消息
List<RefreshMsg> refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS);
for (RefreshMsg msg : refreshMsgs) {
list.add(RefinitivUtil.decode(msg.payload().fieldList(), msg.name()));
}
}
List<RetifiveStockInfo> result = refinitivConsumer.getTopActives();
return ServerResponse.createBySuccess("操作成功", result);
} catch (Exception e) {
log.error("获取股票详情link:" + name, e);
} finally {
// 可能需要重置或清理资源
appClient.resetMessageFuture();
log.error("获取股票详情link:" + TOP_ACTIVES_SYMBOL, e);
}
return ServerResponse.createBySuccess("操作成功", list);
return ServerResponse.createByError();
}
@ApiOperation(value = "获取股票推荐TopGainers", httpMethod = "GET", response = RetifiveStockInfo.class)
@@ -142,33 +103,15 @@ public class RefinitivApiController {
if (StrUtil.equalsIgnoreCase(stockType, "nse")) {
return ServerResponse.createBySuccess("操作成功", new ArrayList<>());
}
String name = ".PG.BO";
List<RetifiveStockInfo> list = Lists.newArrayList();
try {
appClient.subscribe(name); // 根据itemName订阅
// 等待消息
RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间例如10秒
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) {
List<String> strings = RefinitivUtil.decode(refreshMsg.payload().fieldList());
if (CollUtil.isEmpty(strings)) {
return ServerResponse.createBySuccess("操作成功", list);
}
appClient.subscribeList(strings); // 根据itemName订阅
// 等待消息
List<RefreshMsg> refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS);
for (RefreshMsg msg : refreshMsgs) {
list.add(RefinitivUtil.decode(msg.payload().fieldList(), msg.name()));
}
}
try {
List<RetifiveStockInfo> result = refinitivConsumer.getTopGainers();
return ServerResponse.createBySuccess("操作成功", result);
} catch (Exception e) {
log.error("获取股票详情link:" + name, e);
} finally {
// 可能需要重置或清理资源
appClient.resetMessageFuture();
log.error("获取股票详情link:" + TOP_GAINERS_SYMBOL, e);
}
return ServerResponse.createBySuccess("操作成功", list);
return ServerResponse.createByError();
}
@ApiOperation(value = "获取股票推荐TopLosers", httpMethod = "GET", response = RetifiveStockInfo.class)
@@ -179,31 +122,13 @@ public class RefinitivApiController {
return ServerResponse.createBySuccess("操作成功", new ArrayList<>());
}
String name = ".PL.BO";
List<RetifiveStockInfo> list = Lists.newArrayList();
try {
appClient.subscribe(name); // 根据itemName订阅
// 等待消息
RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间例如10秒
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
List<String> strings = RefinitivUtil.decode(refreshMsg.payload().fieldList());
if (CollUtil.isEmpty(strings)) {
return ServerResponse.createBySuccess("操作成功", list);
}
appClient.subscribeList(strings); // 根据itemName订阅
// 等待消息
List<RefreshMsg> refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS);
for (RefreshMsg msg : refreshMsgs) {
list.add(RefinitivUtil.decode(msg.payload().fieldList(),msg.name()));
}
}
List<RetifiveStockInfo> result = refinitivConsumer.getTopLosers();
return ServerResponse.createBySuccess("操作成功", result);
} catch (Exception e) {
log.error("获取股票详情link:"+name,e);
} finally {
// 可能需要重置或清理资源
appClient.resetMessageFuture();
log.error("获取股票详情link:" + TOP_LOSERS_SYMBOL, e);
}
return ServerResponse.createBySuccess("操作成功",list);
return ServerResponse.createByError();
}
}

View File

@@ -59,3 +59,11 @@ news:
src: eastmoney
token: f86299eb129c44545eb8ff53d64902353225652efe63cc4dc95f8936
fields: content,channels
refinitiv:
market-data:
service-name: ELEKTRON_DD
username: GE-A-10288435-3-15856
password: 8EyCWDQ%XITcGRM@RhKokxX05FZJe1
clientId: 662b37b071144c9f8f2507d93037a019a010ae0e
batch-request-timeout: 60000