# Python asyncio로 트레이딩 봇 처리 시간 83% 단축하기
간단한 코드 예시
다음은 5개의 웹사이트에서 데이터를 가져오는 코드입니다:
순차 처리 코드:
import requests
import time
def fetch_data(url):
print(f"{url} 데이터 요청 중...")
response = requests.get(url)
return response.text[:50] # 결과의 처음 50자만 반환
def main():
start_time = time.time()
urls = [
"https://www.google.com",
"https://www.naver.com",
"https://www.daum.net",
"https://www.github.com",
"https://www.python.org"
]
results = []
for url in urls:
# 각 URL을 순차적으로 처리
data = fetch_data(url)
results.append(data)
end_time = time.time()
print(f"총 소요 시간: {end_time - start_time:.2f}초")
print(f"가져온 데이터 수: {len(results)}")
if __name__ == "__main__":
main()
병렬 처리 코드 (asyncio 사용):
import asyncio
import aiohttp
import time
async def fetch_data(url):
print(f"{url} 데이터 요청 중...")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
text = await response.text()
return text[:50] # 결과의 처음 50자만 반환
async def main():
start_time = time.time()
urls = [
"https://www.google.com",
"https://www.naver.com",
"https://www.daum.net",
"https://www.github.com",
"https://www.python.org"
]
# 모든 URL을 동시에 처리
tasks = [fetch_data(url) for url in urls]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"총 소요 시간: {end_time - start_time:.2f}초")
print(f"가져온 데이터 수: {len(results)}")
if __name__ == "__main__":
asyncio.run(main())
핵심 차이점
-
동시 실행: 순차 코드는 한 번에 하나의 URL만 처리하지만, 병렬 코드는 모든 URL에 대한 요청을 동시에 시작합니다.
- 대기 시간 중첩:
- 순차 코드: 총 소요 시간 = URL1 처리 시간 + URL2 처리 시간 + … + URL5 처리 시간
- 병렬 코드: 총 소요 시간 ≈ 가장 오래 걸리는 URL 처리 시간
- 성능 이점:
- 5개 URL이 각각 1초씩 걸린다면:
- 순차 코드: 약 5초 소요
- 병렬 코드: 약 1초 소요 (5배 빠름)
- 5개 URL이 각각 1초씩 걸린다면:
- 코드 패턴:
asyncio.gather(*tasks)
: 여러 작업을 동시에 실행하는 핵심 패턴- 각 작업은
async def
로 정의된 비동기 함수
이러한 기본 개념을 바탕으로, 이제 실제 트레이딩 봇에 적용된 복잡한 병렬 처리를 살펴보겠습니다.
기존 코드 (순차 처리 방식)
async def check_buy_signals(self):
"""매수 신호 체크 및 주문 실행"""
logger.log_system("매수 신호 체크 시작")
# 모니터링 종목 중 상위 50개만 체크
symbols_to_check = MONITORED_SYMBOLS[:50]
# 매수 신호 체크 및 주문 실행 - 순차적으로 처리
for symbol in symbols_to_check:
try:
# 현재가 조회 - 한 번에 하나씩 처리 (약 2초 소요)
symbol_info = await asyncio.wait_for(
api_client.get_symbol_info(symbol),
timeout=2.0
)
if not symbol_info or "current_price" not in symbol_info:
continue
# 매수 신호 확인
signal_info = self.check_buy_signal(symbol)
if not signal_info["has_signal"]:
continue
# 주문 처리 - 역시 순차적으로 처리
await self.process_buy_order(
symbol=symbol,
signal_score=signal_info["score"],
remaining_investment=remaining_investment,
per_stock_amount=investment_info["per_stock_amount"]
)
# 주문 간 간격 추가
await asyncio.sleep(1.0)
except Exception as e:
logger.log_error(e, f"{symbol} 매수 신호 체크 중 오류")
logger.log_system("매수 신호 체크 종료")
문제점:
- 50개 종목 각각에 대해 API 호출 시 대기 시간이 누적됨
- 종목당 평균 3초 소요 시 총 150초 이상 소요
- 종목별로 순차 처리하므로 모든 종목을 동시에 분석할 수 없음
개선된 코드 (병렬 처리 방식)
async def check_buy_signals(self):
"""매수 신호 체크 및 주문 실행 - 병렬 처리 버전"""
logger.log_system("매수 신호 체크 시작")
# 모니터링 종목 중 상위 50개만 체크
symbols_to_check = MONITORED_SYMBOLS[:50]
# 1. 모든 종목의 현재가를 병렬로 조회
async def fetch_symbol_info(symbol):
try:
return await asyncio.wait_for(
api_client.get_symbol_info(symbol),
timeout=2.0
)
except Exception as e:
return {"symbol": symbol, "error": True}
# 모든 API 호출을 동시에 실행
symbol_info_tasks = [fetch_symbol_info(symbol) for symbol in symbols_to_check]
symbol_info_results = await asyncio.gather(*symbol_info_tasks)
# 2. 유효한 결과에 대해 매수 신호 체크 (병렬 처리)
valid_symbols = []
symbol_price_map = {}
for info in symbol_info_results:
if info and "error" not in info and "current_price" in info:
symbol = info["symbol"]
symbol_price_map[symbol] = info
valid_symbols.append(symbol)
# 3. 매수 신호 체크 및 주문 처리 - 병렬 실행
async def process_symbol(symbol):
try:
signal_info = self.check_buy_signal(symbol)
if not signal_info["has_signal"]:
return None
return await self.process_buy_order(
symbol=symbol,
signal_score=signal_info["score"],
remaining_investment=investment_info["total_amount"],
per_stock_amount=investment_info["per_stock_amount"]
)
except Exception:
return None
order_tasks = [process_symbol(symbol) for symbol in valid_symbols]
order_results = await asyncio.gather(*order_tasks)
logger.log_system("매수 신호 체크 종료")
성능 개선 효과
- 기존 순차 처리: 약 150초 소요 (50개 종목 x 평균 3초)
- 병렬 처리 적용 후: 약 25초로 단축 (약 83% 시간 감소)
- 최대 응답 시간: 가장 느린 API 응답 시간 + 약간의 오버헤드
핵심 개선 포인트
- API 호출 병렬화: 모든 종목의 현재가를 한 번에 요청하여 네트워크 대기 시간 중복 제거
# 모든 API 호출을 동시에 실행 symbol_info_tasks = [fetch_symbol_info(symbol) for symbol in symbols_to_check] symbol_info_results = await asyncio.gather(*symbol_info_tasks)
- 신호 분석 병렬화: 종목별 신호 분석을 동시에 수행
# 모든 신호 분석을 동시에 수행 order_tasks = [process_symbol(symbol) for symbol in valid_symbols] order_results = await asyncio.gather(*order_tasks)
- 단계별 처리: 현재가 조회 → 신호 분석 → 주문 처리를 단계별로 나누어 각 단계를 병렬화
주의사항
- API 호출 제한을 고려해야 함 (초당/분당 요청 수 제한)
- 동시 처리 수가 많을 경우 메모리 사용량 증가
- 오류 처리를 철저히 하여 일부 종목 문제가 전체에 영향을 미치지 않도록 함
결론
Python의 asyncio를 활용한 병렬 처리를 통해 주식 트레이딩 봇의 매수 신호 체크 시간을 150초에서 25초로 대폭 단축이 예상됩니다. 시간 효율성이 중요한 트레이딩 환경에서 이러한 최적화는 더 빠른 시장 대응과 더 많은 기회 포착을 가능하게 합니다.
댓글