支持Retifive的操作:批量获取操作

This commit is contained in:
Achilles
2024-04-20 18:35:33 +08:00
parent c3b3259646
commit 87202f2b91
2 changed files with 72 additions and 22 deletions

View File

@@ -1,24 +1,32 @@
package cn.stock.market.listener;
import com.thomsonreuters.ema.access.AckMsg;
import com.thomsonreuters.ema.access.ElementList;
import com.thomsonreuters.ema.access.EmaFactory;
import com.thomsonreuters.ema.access.GenericMsg;
import com.thomsonreuters.ema.access.Msg;
import com.thomsonreuters.ema.access.OmmArray;
import com.thomsonreuters.ema.access.OmmConsumer;
import com.thomsonreuters.ema.access.OmmConsumerClient;
import com.thomsonreuters.ema.access.OmmConsumerConfig;
import com.thomsonreuters.ema.access.OmmConsumerEvent;
import com.thomsonreuters.ema.access.Payload;
import com.thomsonreuters.ema.access.RefreshMsg;
import com.thomsonreuters.ema.access.ReqMsg;
import com.thomsonreuters.ema.access.StatusMsg;
import com.thomsonreuters.ema.access.UpdateMsg;
import com.thomsonreuters.ema.rdm.EmaRdm;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Service
@Slf4j
@@ -27,6 +35,19 @@ public class AppClient implements OmmConsumerClient {
private CompletableFuture<RefreshMsg> messageFuture = new CompletableFuture<>();
private OmmConsumer consumer;
private CompletableFuture<List<RefreshMsg>> messagesFuture = new CompletableFuture<>();
private ConcurrentHashMap<String, RefreshMsg> messages = new ConcurrentHashMap<>();
private AtomicInteger expectedItemCount = new AtomicInteger(0); // 使用AtomicInteger确保线程安全
public CompletableFuture<List<RefreshMsg>> getMessagesFuture() {
return messagesFuture;
}
public void resetMessagesFuture() {
this.messagesFuture = new CompletableFuture<>(); // 重置CompletableFuture
}
@Autowired
@@ -37,6 +58,7 @@ public class AppClient implements OmmConsumerClient {
// 根据itemName进行订阅的方法
public void subscribe(String itemName) {
resetMessageFuture(); // 重置Future以便新的请求
this.expectedItemCount = new AtomicInteger(1); // 设置预期的消息数量
ReqMsg reqMsg = EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name(itemName);
consumer.registerClient(reqMsg, this);
}
@@ -53,7 +75,39 @@ public class AppClient implements OmmConsumerClient {
@Override
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
// log.error("监听的消息:"+refreshMsg.toString());
messageFuture.complete(refreshMsg);
if(expectedItemCount.get()==1){
messageFuture.complete(refreshMsg);
}
if(expectedItemCount.get()>1){
/* System.out.println("监听的消息:"+refreshMsg.streamId()+","+refreshMsg.name()+
",是否完成:"+refreshMsg.complete());*/
RefreshMsg msg = EmaFactory.createRefreshMsg(refreshMsg);
messages.put(refreshMsg.name()+"", msg); // 假设每个refreshMsg都有一个uniqueIdentifier方法
if (messages.size() == expectedItemCount.get()) { // 使用get()获取当前值
List<RefreshMsg> resultList = new ArrayList<>(messages.values());
messagesFuture.complete(resultList);
}
}
}
public void subscribeList(List<String> itemNames) {
resetMessagesFutures(); // 重置Future以便新的请求
this.expectedItemCount = new AtomicInteger(itemNames.size()); // 设置预期的消息数量
ElementList batch = EmaFactory.createElementList();
OmmArray array = EmaFactory.createOmmArray();
if (!itemNames.isEmpty()) {
itemNames.forEach(itemName -> array.add(EmaFactory.createOmmArrayEntry().ascii(itemName)));
batch.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array));
consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").payload(batch), this);
}
}
private void resetMessagesFutures() {
this.messages.clear(); // 清理之前的消息记录
this.messagesFuture = new CompletableFuture<>();
this.expectedItemCount = new AtomicInteger(0);
}
@Override

View File

@@ -217,7 +217,6 @@ public class MessageRetifiveController {
@GetMapping("/getBSETopActivesList")
public ServerResponse<?> getGainerList(String itemName) {
// 计算每个线程应处理的元素数量
String name = ".AV.BO";
List<RetifiveStockInfo> list = Lists.newArrayList();
@@ -227,13 +226,11 @@ public class MessageRetifiveController {
RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间例如10秒
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
List<String> strings = decode4(refreshMsg.payload().fieldList());
for (String link : strings) {
appClient.subscribe(link); // 根据itemName订阅
// 等待消息
RefreshMsg refreshMsg2 = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);
if (DataType.DataTypes.FIELD_LIST == refreshMsg2.payload().dataType()){
list.add(decode3(refreshMsg2.payload().fieldList(),refreshMsg2.name()));
}
appClient.subscribeList(strings); // 根据itemName订阅
// 等待消息
List<RefreshMsg> refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS);
for (RefreshMsg msg : refreshMsgs) {
list.add(decode3(msg.payload().fieldList(),msg.name()));
}
}
} catch (Exception e) {
@@ -254,13 +251,11 @@ public class MessageRetifiveController {
RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间例如10秒
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
List<String> strings = decode4(refreshMsg.payload().fieldList());
for (String link : strings) {
appClient.subscribe(link); // 根据itemName订阅
// 等待消息
RefreshMsg refreshMsg2 = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);
if (DataType.DataTypes.FIELD_LIST == refreshMsg2.payload().dataType()){
list.add(decode3(refreshMsg2.payload().fieldList(),refreshMsg2.name()));
}
appClient.subscribeList(strings); // 根据itemName订阅
// 等待消息
List<RefreshMsg> refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS);
for (RefreshMsg msg : refreshMsgs) {
list.add(decode3(msg.payload().fieldList(),msg.name()));
}
}
} catch (Exception e) {
@@ -270,6 +265,7 @@ public class MessageRetifiveController {
}
@GetMapping("/getBSETopLoserList")
public ServerResponse<?> getBSETopLoserList() {
@@ -282,13 +278,11 @@ public class MessageRetifiveController {
RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间例如10秒
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
List<String> strings = decode4(refreshMsg.payload().fieldList());
for (String link : strings) {
appClient.subscribe(link); // 根据itemName订阅
appClient.subscribeList(strings); // 根据itemName订阅
// 等待消息
RefreshMsg refreshMsg2 = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);
if (DataType.DataTypes.FIELD_LIST == refreshMsg2.payload().dataType()){
list.add(decode3(refreshMsg2.payload().fieldList(),refreshMsg2.name()));
}
List<RefreshMsg> refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS);
for (RefreshMsg msg : refreshMsgs) {
list.add(decode3(msg.payload().fieldList(),msg.name()));
}
}
} catch (Exception e) {
@@ -297,6 +291,8 @@ public class MessageRetifiveController {
return ServerResponse.createBySuccess("操作成功",list);
}
private List<String> decode4(FieldList fieldList) {
List<String> list = Lists.newArrayList();
String nextLink = "";