import asyncio import httpx import math import pandas as pd from datetime import datetime, timezone, timedelta import logging logging.getLogger("httpx").setLevel(logging.WARNING) HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36" ), "Referer": "https://quote.eastmoney.com/center/gridlist.html#fund_etf", # 合理来源页 "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", } async def fetch_page(client: httpx.AsyncClient, url: str, params: dict) -> pd.DataFrame: try: resp = await client.get(url, params=params, timeout=10) data_json = resp.json() if "data" in data_json and data_json["data"].get("diff"): return pd.DataFrame(data_json["data"]["diff"]) except Exception as e: print(f"Error on page {params.get('pn')}: {e}") return pd.DataFrame() async def fund_etf_spot_em_async() -> pd.DataFrame: url = "https://88.push2.eastmoney.com/api/qt/clist/get" params = { "pn": "1", "pz": "100", "po": "1", "np": "1", "ut": "bd1d9ddb04089700cf9c27f6f7426281", "fltt": "2", "invt": "2", "wbp2u": "|0|0|0|web", "fid": "f12", "fs": "b:MK0021,b:MK0022,b:MK0023,b:MK0024,b:MK0827", "fields": ( "f1,f2,f3,f4,f5,f6,f7,f8,f9,f10," "f12,f13,f14,f15,f16,f17,f18,f20,f21," "f23,f24,f25,f22,f11,f30,f31,f32,f33," "f34,f35,f38,f62,f63,f64,f65,f66,f69," "f72,f75,f78,f81,f84,f87,f115,f124,f128," "f136,f152,f184,f297,f402,f441" ), } async with httpx.AsyncClient(headers=HEADERS, http2=True) as client: # 获取首页数据,确认总页数 resp = await client.get(url, params=params) data_json = resp.json() if "data" not in data_json or not data_json["data"].get("diff"): return pd.DataFrame() total = data_json["data"]["total"] per_page = len(data_json["data"]["diff"]) total_pages = math.ceil(total / per_page) tasks = [] for page in range(1, total_pages + 1): new_params = params.copy() new_params["pn"] = str(page) tasks.append(fetch_page(client, url, new_params)) dfs = await asyncio.gather(*tasks) temp_df = pd.concat([df for df in dfs if not df.empty], ignore_index=True) temp_df.rename( columns={ "f12": "代码", "f14": "名称", "f2": "最新价", "f4": "涨跌额", "f3": "涨跌幅", "f5": "成交量", "f6": "成交额", "f7": "振幅", "f17": "开盘价", "f15": "最高价", "f16": "最低价", "f18": "昨收", "f8": "换手率", "f10": "量比", "f30": "现手", "f31": "买一", "f32": "卖一", "f33": "委比", "f34": "外盘", "f35": "内盘", "f62": "主力净流入-净额", "f184": "主力净流入-净占比", "f66": "超大单净流入-净额", "f69": "超大单净流入-净占比", "f72": "大单净流入-净额", "f75": "大单净流入-净占比", "f78": "中单净流入-净额", "f81": "中单净流入-净占比", "f84": "小单净流入-净额", "f87": "小单净流入-净占比", "f38": "最新份额", "f21": "流通市值", "f20": "总市值", "f402": "基金折价率", "f441": "IOPV实时估值", "f297": "数据日期", "f124": "更新时间", }, inplace=True, ) temp_df = temp_df[ [ "代码", "名称", "最新价", "IOPV实时估值", "基金折价率", "涨跌额", "涨跌幅", "成交量", "成交额", "开盘价", "最高价", "最低价", "昨收", "振幅", "换手率", "量比", "委比", "外盘", "内盘", "主力净流入-净额", "主力净流入-净占比", "超大单净流入-净额", "超大单净流入-净占比", "大单净流入-净额", "大单净流入-净占比", "中单净流入-净额", "中单净流入-净占比", "小单净流入-净额", "小单净流入-净占比", "现手", "买一", "卖一", "最新份额", "流通市值", "总市值", "数据日期", "更新时间", ] ].copy() temp_df["最新价"] = pd.to_numeric(temp_df["最新价"], errors="coerce") temp_df["涨跌额"] = pd.to_numeric(temp_df["涨跌额"], errors="coerce") temp_df["涨跌幅"] = pd.to_numeric(temp_df["涨跌幅"], errors="coerce") temp_df["成交量"] = pd.to_numeric(temp_df["成交量"], errors="coerce") temp_df["成交额"] = pd.to_numeric(temp_df["成交额"], errors="coerce") temp_df["开盘价"] = pd.to_numeric(temp_df["开盘价"], errors="coerce") temp_df["最高价"] = pd.to_numeric(temp_df["最高价"], errors="coerce") temp_df["最低价"] = pd.to_numeric(temp_df["最低价"], errors="coerce") temp_df["昨收"] = pd.to_numeric(temp_df["昨收"], errors="coerce") temp_df["换手率"] = pd.to_numeric(temp_df["换手率"], errors="coerce") temp_df["量比"] = pd.to_numeric(temp_df["量比"], errors="coerce") temp_df["委比"] = pd.to_numeric(temp_df["委比"], errors="coerce") temp_df["外盘"] = pd.to_numeric(temp_df["外盘"], errors="coerce") temp_df["内盘"] = pd.to_numeric(temp_df["内盘"], errors="coerce") temp_df["流通市值"] = pd.to_numeric(temp_df["流通市值"], errors="coerce") temp_df["总市值"] = pd.to_numeric(temp_df["总市值"], errors="coerce") temp_df["振幅"] = pd.to_numeric(temp_df["振幅"], errors="coerce") temp_df["现手"] = pd.to_numeric(temp_df["现手"], errors="coerce") temp_df["买一"] = pd.to_numeric(temp_df["买一"], errors="coerce") temp_df["卖一"] = pd.to_numeric(temp_df["卖一"], errors="coerce") temp_df["最新份额"] = pd.to_numeric(temp_df["最新份额"], errors="coerce") temp_df["IOPV实时估值"] = pd.to_numeric(temp_df["IOPV实时估值"], errors="coerce") temp_df["基金折价率"] = pd.to_numeric(temp_df["基金折价率"], errors="coerce") temp_df["主力净流入-净额"] = pd.to_numeric( temp_df["主力净流入-净额"], errors="coerce" ) temp_df["主力净流入-净占比"] = pd.to_numeric( temp_df["主力净流入-净占比"], errors="coerce" ) temp_df["超大单净流入-净额"] = pd.to_numeric( temp_df["超大单净流入-净额"], errors="coerce" ) temp_df["超大单净流入-净占比"] = pd.to_numeric( temp_df["超大单净流入-净占比"], errors="coerce" ) temp_df["大单净流入-净额"] = pd.to_numeric( temp_df["大单净流入-净额"], errors="coerce" ) temp_df["大单净流入-净占比"] = pd.to_numeric( temp_df["大单净流入-净占比"], errors="coerce" ) temp_df["中单净流入-净额"] = pd.to_numeric( temp_df["中单净流入-净额"], errors="coerce" ) temp_df["中单净流入-净占比"] = pd.to_numeric( temp_df["中单净流入-净占比"], errors="coerce" ) temp_df["小单净流入-净额"] = pd.to_numeric( temp_df["小单净流入-净额"], errors="coerce" ) temp_df["小单净流入-净占比"] = pd.to_numeric( temp_df["小单净流入-净占比"], errors="coerce" ) temp_df["数据日期"] = pd.to_datetime( temp_df["数据日期"], format="%Y%m%d", errors="coerce" ) temp_df["更新时间"] = ( pd.to_datetime(temp_df["更新时间"], unit="s", errors="coerce") .dt.tz_localize("UTC") .dt.tz_convert("Asia/Shanghai") ) return temp_df # 示例同步调用封装(如需在同步代码中使用) def fund_etf_spot_em() -> pd.DataFrame: return asyncio.run(fund_etf_spot_em_async()) if __name__ == "__main__": df = fund_etf_spot_em() print(df.head())