之前批量下载财务数据的代码,下载10万行数据花了差不多24小时,太慢了,经过研究发现阻塞的多线程服务器不会踢,速度快了很多
另外,示例代码中加入了断点续传功能。路子比较野,主要是功能的实现,如果有天要打包成app会再改改
建立线程列表
1 2 |
threads = [] |
添加线程到列表中
1 2 |
threads.append(Thread(target=self.down_data, args=(stock, year, quarter))) |
遍历线程列表,启动线程
1 2 3 |
for thread in threads: thread.start() |
等待线程执行结束
1 2 3 |
for thread in threads: thread.join() |
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
def run(self, stocks=('sz.000001',)): '''遍历年份和季度,每个组合传递一次参数给dowun_data''' count = 0 print('下载所需财务数据……') threads = [] for stock in set(stocks): _df = self.data_df.loc[self.data_df['code'] == stock, :] for year in self.years: if year == 2021: self.down_data(stock, year, 1) else: for quarter in self.quarters: if not _df.empty: if year in _df['year'].unique(): if quarter in _df['quarter'].unique(): print('{}的{}第{}期数据已存在!'.format(stock, year, quarter)) continue else: if count == 500: self.data_df.to_csv('all_base_info_in.csv') print('分段写入文件成功!') count = 0 threads.append(Thread(target=self.down_data, args=(stock, year, quarter))) count += 1 for thread in threads: thread.start() for thread in threads: thread.join() self.data_df[self.data_df.columns[2:]] = self.data_df[self.data_df.columns[2:]].apply(lambda x: x.astype('float64')) return self.data_df |
近期评论