diff --git a/src/main/java/cn/stock/market/listener/AppClient.java b/src/main/java/cn/stock/market/listener/AppClient.java index 678dc51..2f8f6aa 100644 --- a/src/main/java/cn/stock/market/listener/AppClient.java +++ b/src/main/java/cn/stock/market/listener/AppClient.java @@ -1,24 +1,32 @@ package cn.stock.market.listener; import com.thomsonreuters.ema.access.AckMsg; +import com.thomsonreuters.ema.access.ElementList; import com.thomsonreuters.ema.access.EmaFactory; import com.thomsonreuters.ema.access.GenericMsg; import com.thomsonreuters.ema.access.Msg; +import com.thomsonreuters.ema.access.OmmArray; import com.thomsonreuters.ema.access.OmmConsumer; import com.thomsonreuters.ema.access.OmmConsumerClient; import com.thomsonreuters.ema.access.OmmConsumerConfig; import com.thomsonreuters.ema.access.OmmConsumerEvent; +import com.thomsonreuters.ema.access.Payload; import com.thomsonreuters.ema.access.RefreshMsg; import com.thomsonreuters.ema.access.ReqMsg; import com.thomsonreuters.ema.access.StatusMsg; import com.thomsonreuters.ema.access.UpdateMsg; +import com.thomsonreuters.ema.rdm.EmaRdm; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; @Service @Slf4j @@ -27,6 +35,19 @@ public class AppClient implements OmmConsumerClient { private CompletableFuture messageFuture = new CompletableFuture<>(); private OmmConsumer consumer; + private CompletableFuture> messagesFuture = new CompletableFuture<>(); + private ConcurrentHashMap messages = new ConcurrentHashMap<>(); + private AtomicInteger expectedItemCount = new AtomicInteger(0); // 使用AtomicInteger确保线程安全 + + + public CompletableFuture> getMessagesFuture() { + return messagesFuture; + } + + public void resetMessagesFuture() { + this.messagesFuture = new CompletableFuture<>(); // 重置CompletableFuture + } + @Autowired @@ -37,6 +58,7 @@ public class AppClient implements OmmConsumerClient { // 根据itemName进行订阅的方法 public void subscribe(String itemName) { resetMessageFuture(); // 重置Future以便新的请求 + this.expectedItemCount = new AtomicInteger(1); // 设置预期的消息数量 ReqMsg reqMsg = EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").name(itemName); consumer.registerClient(reqMsg, this); } @@ -53,7 +75,39 @@ public class AppClient implements OmmConsumerClient { @Override public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) { // log.error("监听的消息:"+refreshMsg.toString()); - messageFuture.complete(refreshMsg); + if(expectedItemCount.get()==1){ + messageFuture.complete(refreshMsg); + } + if(expectedItemCount.get()>1){ + /* System.out.println("监听的消息:"+refreshMsg.streamId()+","+refreshMsg.name()+ + ",是否完成:"+refreshMsg.complete());*/ + RefreshMsg msg = EmaFactory.createRefreshMsg(refreshMsg); + messages.put(refreshMsg.name()+"", msg); // 假设每个refreshMsg都有一个uniqueIdentifier方法 + if (messages.size() == expectedItemCount.get()) { // 使用get()获取当前值 + List resultList = new ArrayList<>(messages.values()); + messagesFuture.complete(resultList); + } + } + + } + + public void subscribeList(List itemNames) { + resetMessagesFutures(); // 重置Future以便新的请求 + this.expectedItemCount = new AtomicInteger(itemNames.size()); // 设置预期的消息数量 + ElementList batch = EmaFactory.createElementList(); + OmmArray array = EmaFactory.createOmmArray(); + + if (!itemNames.isEmpty()) { + itemNames.forEach(itemName -> array.add(EmaFactory.createOmmArrayEntry().ascii(itemName))); + batch.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array)); + consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").payload(batch), this); + } + } + + private void resetMessagesFutures() { + this.messages.clear(); // 清理之前的消息记录 + this.messagesFuture = new CompletableFuture<>(); + this.expectedItemCount = new AtomicInteger(0); } @Override diff --git a/src/main/java/cn/stock/market/web/MessageRetifiveController.java b/src/main/java/cn/stock/market/web/MessageRetifiveController.java index beae64a..c5f002d 100644 --- a/src/main/java/cn/stock/market/web/MessageRetifiveController.java +++ b/src/main/java/cn/stock/market/web/MessageRetifiveController.java @@ -217,7 +217,6 @@ public class MessageRetifiveController { @GetMapping("/getBSETopActivesList") public ServerResponse getGainerList(String itemName) { - // 计算每个线程应处理的元素数量 String name = ".AV.BO"; List list = Lists.newArrayList(); @@ -227,13 +226,11 @@ public class MessageRetifiveController { RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){ List strings = decode4(refreshMsg.payload().fieldList()); - for (String link : strings) { - appClient.subscribe(link); // 根据itemName订阅 - // 等待消息 - RefreshMsg refreshMsg2 = appClient.getMessageFuture().get(10, TimeUnit.SECONDS); - if (DataType.DataTypes.FIELD_LIST == refreshMsg2.payload().dataType()){ - list.add(decode3(refreshMsg2.payload().fieldList(),refreshMsg2.name())); - } + appClient.subscribeList(strings); // 根据itemName订阅 + // 等待消息 + List refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS); + for (RefreshMsg msg : refreshMsgs) { + list.add(decode3(msg.payload().fieldList(),msg.name())); } } } catch (Exception e) { @@ -254,13 +251,11 @@ public class MessageRetifiveController { RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){ List strings = decode4(refreshMsg.payload().fieldList()); - for (String link : strings) { - appClient.subscribe(link); // 根据itemName订阅 - // 等待消息 - RefreshMsg refreshMsg2 = appClient.getMessageFuture().get(10, TimeUnit.SECONDS); - if (DataType.DataTypes.FIELD_LIST == refreshMsg2.payload().dataType()){ - list.add(decode3(refreshMsg2.payload().fieldList(),refreshMsg2.name())); - } + appClient.subscribeList(strings); // 根据itemName订阅 + // 等待消息 + List refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS); + for (RefreshMsg msg : refreshMsgs) { + list.add(decode3(msg.payload().fieldList(),msg.name())); } } } catch (Exception e) { @@ -270,6 +265,7 @@ public class MessageRetifiveController { } + @GetMapping("/getBSETopLoserList") public ServerResponse getBSETopLoserList() { @@ -282,13 +278,11 @@ public class MessageRetifiveController { RefreshMsg refreshMsg = appClient.getMessageFuture().get(10, TimeUnit.SECONDS);// 设置超时时间,例如10秒 if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){ List strings = decode4(refreshMsg.payload().fieldList()); - for (String link : strings) { - appClient.subscribe(link); // 根据itemName订阅 + appClient.subscribeList(strings); // 根据itemName订阅 // 等待消息 - RefreshMsg refreshMsg2 = appClient.getMessageFuture().get(10, TimeUnit.SECONDS); - if (DataType.DataTypes.FIELD_LIST == refreshMsg2.payload().dataType()){ - list.add(decode3(refreshMsg2.payload().fieldList(),refreshMsg2.name())); - } + List refreshMsgs = appClient.getMessagesFuture().get(10, TimeUnit.SECONDS); + for (RefreshMsg msg : refreshMsgs) { + list.add(decode3(msg.payload().fieldList(),msg.name())); } } } catch (Exception e) { @@ -297,6 +291,8 @@ public class MessageRetifiveController { return ServerResponse.createBySuccess("操作成功",list); } + + private List decode4(FieldList fieldList) { List list = Lists.newArrayList(); String nextLink = "";