129 lines
4.9 KiB
Python
129 lines
4.9 KiB
Python
|
import sqlite3
|
||
|
import time
|
||
|
import logging
|
||
|
from datetime import datetime
|
||
|
import akshare as ak
|
||
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||
|
import pandas as pd
|
||
|
|
||
|
class StockDataSystem:
|
||
|
def __init__(self, db_name='stock_data.db'):
|
||
|
self.db_name = db_name
|
||
|
self.setup_logging()
|
||
|
self.init_database()
|
||
|
self.scheduler = BackgroundScheduler()
|
||
|
|
||
|
def setup_logging(self):
|
||
|
self.logger = logging.getLogger('StockDataSystem')
|
||
|
self.logger.setLevel(logging.INFO)
|
||
|
handler = logging.StreamHandler()
|
||
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||
|
handler.setFormatter(formatter)
|
||
|
self.logger.addHandler(handler)
|
||
|
|
||
|
def init_database(self):
|
||
|
with sqlite3.connect(self.db_name) as conn:
|
||
|
cursor = conn.cursor()
|
||
|
cursor.execute('''
|
||
|
CREATE TABLE IF NOT EXISTS stock_realtime (
|
||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
|
序号 INTEGER,
|
||
|
代码 TEXT NOT NULL,
|
||
|
名称 TEXT NOT NULL,
|
||
|
最新价 REAL,
|
||
|
涨跌幅 REAL,
|
||
|
涨跌额 REAL,
|
||
|
成交量 REAL,
|
||
|
成交额 REAL,
|
||
|
振幅 REAL,
|
||
|
最高 REAL,
|
||
|
最低 REAL,
|
||
|
今开 REAL,
|
||
|
昨收 REAL,
|
||
|
量比 REAL,
|
||
|
换手率 REAL,
|
||
|
"市盈率-动态" REAL,
|
||
|
市净率 REAL,
|
||
|
总市值 REAL,
|
||
|
流通市值 REAL,
|
||
|
涨速 REAL,
|
||
|
"5分钟涨跌" REAL,
|
||
|
"60日涨跌幅" REAL,
|
||
|
年初至今涨跌幅 REAL,
|
||
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
||
|
)
|
||
|
''')
|
||
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_code_time ON stock_realtime(代码, timestamp)')
|
||
|
self.logger.info("数据库初始化完成")
|
||
|
|
||
|
def fetch_stock_data(self):
|
||
|
try:
|
||
|
stock_zh_a_spot_df = ak.stock_zh_a_spot_em()
|
||
|
# 第三列'最新价' 没数据的行 删掉
|
||
|
df = stock_zh_a_spot_df.dropna(subset=['最新价'])
|
||
|
return df
|
||
|
except Exception as e:
|
||
|
self.logger.error(f"获取股票数据失败: {str(e)}")
|
||
|
return None
|
||
|
|
||
|
def store_data(self, df):
|
||
|
if df is None or df.empty:
|
||
|
self.logger.warning("没有数据需要存储")
|
||
|
return
|
||
|
|
||
|
try:
|
||
|
with sqlite3.connect(self.db_name) as conn:
|
||
|
df['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
df.to_sql('stock_realtime', conn, if_exists='append', index=False)
|
||
|
self.logger.info(f"成功存储 {len(df)} 条记录")
|
||
|
except sqlite3.Error as e:
|
||
|
self.logger.error(f"存储数据失败: {str(e)}")
|
||
|
|
||
|
def collect_data(self):
|
||
|
current_time = datetime.now()
|
||
|
start_time = current_time.replace(hour=9, minute=30, second=0, microsecond=0)
|
||
|
end_time_1 = current_time.replace(hour=11, minute=30, second=0, microsecond=0)
|
||
|
end_time_2 = current_time.replace(hour=13, minute=0, second=0, microsecond=0)
|
||
|
end_time_3 = current_time.replace(hour=15, minute=0, second=0, microsecond=0)
|
||
|
|
||
|
if start_time <= current_time <= end_time_1 or end_time_2 <= current_time <= end_time_3:
|
||
|
self.logger.info("开始收集数据...")
|
||
|
df = self.fetch_stock_data()
|
||
|
self.store_data(df)
|
||
|
else:
|
||
|
self.logger.info("当前不在开盘时间,跳过数据收集")
|
||
|
|
||
|
def start_collection(self, interval_minutes=5):
|
||
|
self.scheduler.add_job(
|
||
|
self.collect_data,
|
||
|
'interval',
|
||
|
minutes=interval_minutes,
|
||
|
id='stock_data_collection'
|
||
|
)
|
||
|
self.scheduler.start()
|
||
|
self.logger.info(f"数据收集已启动,间隔为 {interval_minutes} 分钟")
|
||
|
|
||
|
def stop_collection(self):
|
||
|
self.scheduler.shutdown()
|
||
|
self.logger.info("数据收集已停止")
|
||
|
|
||
|
def query_stock_data(self, code, start_time=None, end_time=None):
|
||
|
try:
|
||
|
with sqlite3.connect(self.db_name) as conn:
|
||
|
query = "SELECT * FROM stock_realtime WHERE 代码 = ?"
|
||
|
params = [code]
|
||
|
|
||
|
if start_time:
|
||
|
query += " AND timestamp >= ?"
|
||
|
params.append(start_time)
|
||
|
if end_time:
|
||
|
query += " AND timestamp <= ?"
|
||
|
params.append(end_time)
|
||
|
|
||
|
query += " ORDER BY timestamp DESC"
|
||
|
|
||
|
df = pd.read_sql_query(query, conn, params=params)
|
||
|
return df
|
||
|
except sqlite3.Error as e:
|
||
|
self.logger.error(f"查询数据失败: {str(e)}")
|
||
|
return None
|