money control代码提交

This commit is contained in:
Achilles
2024-01-03 21:20:05 +08:00
parent b1b60bcce6
commit 7f88f3be02
15 changed files with 686 additions and 1 deletions

View File

@@ -0,0 +1,66 @@
package cn.stock.market.infrastructure.db.po;
import java.lang.Integer;
import java.lang.String;
import java.util.Date;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.hibernate.annotations.DynamicInsert;
import org.hibernate.annotations.DynamicUpdate;
/**
* MoneyStockPO
*
* @author rplees
* @email rplees.i.ly@gmail.com
* @created 2024/01/03
*/
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@DynamicInsert
@DynamicUpdate
@Table(
name = "money_stock"
)
public class MoneyStockPO {
/**
* 主键 */
@Id
@GeneratedValue(
strategy = javax.persistence.GenerationType.IDENTITY
)
Integer id;
/**
* 股票名称 */
String stockName;
/**
* BSE or NSE */
String stockType;
/**
* money Control的id */
String moneyScId;
/**
* 自有self_url */
String selfUrl;
/**
* 细节url */
String detailUrl;
/**
* 保存时间 */
Date saveTime;
}

View File

@@ -0,0 +1,15 @@
package cn.stock.market.infrastructure.db.repo;
import cn.stock.market.infrastructure.db.po.MoneyStockPO;
import com.rp.spring.jpa.GenericJpaRepository;
import java.lang.Integer;
/**
* MoneyStockRepo
*
* @author rplees
* @email rplees.i.ly@gmail.com
* @created 2024/01/03
*/
public interface MoneyStockRepo extends GenericJpaRepository<MoneyStockPO, Integer> {
}

View File

@@ -0,0 +1,260 @@
package cn.stock.market.infrastructure.job;
import cn.hutool.core.collection.CollectionUtil;
import cn.stock.market.domain.basic.entity.MoneyStock;
import cn.stock.market.domain.basic.repository.MoneyStockRepository;
import cn.stock.market.infrastructure.db.po.QMoneyStockPO;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClients;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author gs
* @date 2024/1/2 17:31
*/
@RestController
@Slf4j
public class MoneyScraper {
private static final int MAX_RETRY_ATTEMPTS = 10;
private static final int NUM_THREADS = 5;
@Autowired
private MoneyStockRepository moneyStockRepository;
@GetMapping("testScraperGetMoneyControlStock")
public void schedule(){
List<String> letters = new ArrayList<>();
for (char c = 'A'; c <= 'Z'; c++) {
letters.add(String.valueOf(c));
}
letters.add("others");
int tasksPerThread = (int) Math.ceil((double) letters.size() / NUM_THREADS);
// 手动创建线程池
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
NUM_THREADS, NUM_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
List<CompletableFuture<List<String>>> futures = new ArrayList<>();
for (int i = 0; i < NUM_THREADS; i++) {
int startIndex = i * tasksPerThread;
int endIndex = Math.min((i + 1) * tasksPerThread, letters.size());
List<String> letterRange = letters.subList(startIndex, endIndex);
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> processLetters(letterRange), executorService);
futures.add(future);
}
List<String> results = new ArrayList<>();
for (CompletableFuture<List<String>> future : futures) {
try {
results.addAll(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executorService.shutdown();
log.error("All threads completed. Results:");
for (int i = 0; i < results.size(); i++) {
log.error("Thread " + (i + 1) + " processed letters: " + results.get(i));
}
}
private List<String> sendHttpRequest(String url, HttpClient httpClient, String letter) throws IOException {
Document document = fetchStockDetails(url);
extractExchangeDetails(document);
List<String> result = new ArrayList<>();
result.add("Thread " + Thread.currentThread().getName() + " processed letters: " + letter);
return result;
}
private List<String> processLetters(List<String> letterRange) {
log.error("Thread " + Thread.currentThread().getName() + " is processing letters: " + letterRange);
String urlBase = "https://www.moneycontrol.com/india/stockpricequote/%s";
HttpClient httpClient = HttpClients.createDefault();
List<String> results = new ArrayList<>();
for (String letter : letterRange) {
String detailUrl = String.format(urlBase, letter);
log.error("Thread " + Thread.currentThread().getName() + ",请求的url:" + detailUrl);
// 重试逻辑
int maxAttempts = 10;
int attempt = 1;
while (attempt <= maxAttempts) {
try {
List<String> response = sendHttpRequest(detailUrl, httpClient, letter) ;
results.addAll(response);
break; // 如果成功则跳出循环
} catch (IOException | RuntimeException e) {
// 处理异常的逻辑
e.printStackTrace();
log.error("Attempt " + attempt + " failed. Retrying...");
attempt++;
try {
Thread.sleep(1000); // 休眠1秒
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
}
return results;
}
public static Document fetchStockDetails(String url) {
return fetchDocumentWithRetry(url);
}
public static Document fetchCompanyDetails(String url) {
return fetchDocumentWithRetry(url);
}
private static Document fetchDocumentWithRetry(String url) {
HttpClient httpClient = HttpClients.createDefault();
int retryAttempts = 0;
while (retryAttempts < MAX_RETRY_ATTEMPTS) {
try {
HttpGet request = new HttpGet(url);
request.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36");
HttpResponse response = httpClient.execute(request);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode >= 200 && statusCode < 300) {
return Jsoup.parse(response.getEntity().getContent(), null, url);
} else {
retryAttempts++;
log.error("HTTP request failed with status code: " + statusCode);
}
} catch (IOException e) {
retryAttempts++;
log.error(Thread.currentThread().getName()+",Exception during HTTP request: " +retryAttempts);
}
}
return null; // Retry attempts exhausted
}
public void extractExchangeDetails(Document soup) {
Elements companies = soup.select("table.pcq_tbl.MT10");
for (Element company : companies) {
Elements elements = company.select("tr > td > a");
for (Element element : elements) {
String textContent = element.text().trim();
String linkAttribute = element.attr("href");
log.info(Thread.currentThread().getName()+",Text Content: " + textContent + ", Link Attribute: " + linkAttribute);
Document soup2 = fetchCompanyDetails(linkAttribute);
if (soup2 != null) {
Element comIdInput = soup2.selectFirst("input[id=ap_sc_id]");
String companyCodeId = "";
if (comIdInput != null) {
companyCodeId = comIdInput.val();
log.info(Thread.currentThread().getName()+",the stockName: " + textContent + ", THE input id: " + companyCodeId);
} else {
log.error(Thread.currentThread().getName()+" No <input> with id='ap_sc_id' found on the website.");
}
if (soup2 != null) {
Element ulElement = soup2.selectFirst("ul[id=nseBseTab]");
if (ulElement != null) {
for (Element aElement : ulElement.select("a")) {
String exchangeValue = aElement.text().trim();
if ("BSE".equals(exchangeValue) || "NSE".equals(exchangeValue)) {
log.info(Thread.currentThread().getName()+",stockName: " + textContent + ", self_link: " + linkAttribute +
", the exchange Value: " + exchangeValue);
MoneyStock build = MoneyStock.builder().stockName(textContent).stockType(exchangeValue.toLowerCase(Locale.ROOT))
.detailUrl(String.format("https://priceapi.moneycontrol.com/pricefeed/%s/equitycash/%s", exchangeValue.toLowerCase(), companyCodeId))
.selfUrl(linkAttribute)
.moneyScId(companyCodeId).saveTime(new Date()).build();
List<MoneyStock> all = moneyStockRepository.findAll(QMoneyStockPO.moneyStockPO.stockName.eq(textContent), QMoneyStockPO.moneyStockPO.stockType.eq(exchangeValue.toLowerCase(Locale.ROOT)));
if(CollectionUtil.isEmpty(all)){
moneyStockRepository.save(build);
}
} else {
log.error(Thread.currentThread().getName()+", stockName: " + textContent + ", self_link: " + linkAttribute +
" is not a valid exchange type");
MoneyStock build = MoneyStock.builder().stockName(textContent)
.detailUrl(String.format("https://priceapi.moneycontrol.com/pricefeed/%s/equitycash/%s", exchangeValue.toLowerCase(), companyCodeId))
.selfUrl(linkAttribute)
.moneyScId(companyCodeId).saveTime(new Date()).build();
List<MoneyStock> all = moneyStockRepository.findAll(QMoneyStockPO.moneyStockPO.stockName.eq(textContent));
if(CollectionUtil.isEmpty(all)){
moneyStockRepository.save(build);
}
}
}
} else {
log.info("stockName: " + textContent + ", self_link: " + linkAttribute +
" has no current exchange types");
MoneyStock build = MoneyStock.builder().stockName(textContent).selfUrl(linkAttribute)
.moneyScId(companyCodeId).saveTime(new Date()).build();
List<MoneyStock> all = moneyStockRepository.findAll(QMoneyStockPO.moneyStockPO.stockName.eq(textContent));
if(CollectionUtil.isEmpty(all)){
moneyStockRepository.save(build);
}
}
}
} else {
log.info(Thread.currentThread().getName()+",stockName: " + textContent + ", self_link: " + linkAttribute +
" cannot find corresponding stock id");
MoneyStock build = MoneyStock.builder().stockName(textContent).selfUrl(linkAttribute)
.saveTime(new Date()).build();
List<MoneyStock> all = moneyStockRepository.findAll(QMoneyStockPO.moneyStockPO.stockName.eq(textContent));
if(CollectionUtil.isEmpty(all)){
moneyStockRepository.save(build);
}
}
}
}
}
}