간단한 코드 예시

다음은 5개의 웹사이트에서 데이터를 가져오는 코드입니다:

순차 처리 코드:

import requests
import time

def fetch_data(url):
    print(f"{url} 데이터 요청 중...")
    response = requests.get(url)
    return response.text[:50]  # 결과의 처음 50자만 반환

def main():
    start_time = time.time()
    
    urls = [
        "https://www.google.com",
        "https://www.naver.com",
        "https://www.daum.net",
        "https://www.github.com",
        "https://www.python.org"
    ]
    
    results = []
    for url in urls:
        # 각 URL을 순차적으로 처리
        data = fetch_data(url)
        results.append(data)
    
    end_time = time.time()
    print(f"총 소요 시간: {end_time - start_time:.2f}초")
    print(f"가져온 데이터 수: {len(results)}")

if __name__ == "__main__":
    main()

병렬 처리 코드 (asyncio 사용):

import asyncio
import aiohttp
import time

async def fetch_data(url):
    print(f"{url} 데이터 요청 중...")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            text = await response.text()
            return text[:50]  # 결과의 처음 50자만 반환

async def main():
    start_time = time.time()
    
    urls = [
        "https://www.google.com",
        "https://www.naver.com",
        "https://www.daum.net",
        "https://www.github.com",
        "https://www.python.org"
    ]
    
    # 모든 URL을 동시에 처리
    tasks = [fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    end_time = time.time()
    print(f"총 소요 시간: {end_time - start_time:.2f}초")
    print(f"가져온 데이터 수: {len(results)}")

if __name__ == "__main__":
    asyncio.run(main())

핵심 차이점

  1. 동시 실행: 순차 코드는 한 번에 하나의 URL만 처리하지만, 병렬 코드는 모든 URL에 대한 요청을 동시에 시작합니다.

  2. 대기 시간 중첩:
    • 순차 코드: 총 소요 시간 = URL1 처리 시간 + URL2 처리 시간 + … + URL5 처리 시간
    • 병렬 코드: 총 소요 시간 ≈ 가장 오래 걸리는 URL 처리 시간
  3. 성능 이점:
    • 5개 URL이 각각 1초씩 걸린다면:
      • 순차 코드: 약 5초 소요
      • 병렬 코드: 약 1초 소요 (5배 빠름)
  4. 코드 패턴:
    • asyncio.gather(*tasks): 여러 작업을 동시에 실행하는 핵심 패턴
    • 각 작업은 async def로 정의된 비동기 함수

이러한 기본 개념을 바탕으로, 이제 실제 트레이딩 봇에 적용된 복잡한 병렬 처리를 살펴보겠습니다.

기존 코드 (순차 처리 방식)

async def check_buy_signals(self):
    """매수 신호 체크 및 주문 실행"""
    logger.log_system("매수 신호 체크 시작")
    
    # 모니터링 종목 중 상위 50개만 체크
    symbols_to_check = MONITORED_SYMBOLS[:50]
    
    # 매수 신호 체크 및 주문 실행 - 순차적으로 처리
    for symbol in symbols_to_check:
        try:
            # 현재가 조회 - 한 번에 하나씩 처리 (약 2초 소요)
            symbol_info = await asyncio.wait_for(
                api_client.get_symbol_info(symbol),
                timeout=2.0
            )
            
            if not symbol_info or "current_price" not in symbol_info:
                continue
                
            # 매수 신호 확인
            signal_info = self.check_buy_signal(symbol)
            if not signal_info["has_signal"]:
                continue
                
            # 주문 처리 - 역시 순차적으로 처리
            await self.process_buy_order(
                symbol=symbol,
                signal_score=signal_info["score"],
                remaining_investment=remaining_investment,
                per_stock_amount=investment_info["per_stock_amount"]
            )
            
            # 주문 간 간격 추가
            await asyncio.sleep(1.0)
        except Exception as e:
            logger.log_error(e, f"{symbol} 매수 신호 체크 중 오류")
    
    logger.log_system("매수 신호 체크 종료")

문제점:

  • 50개 종목 각각에 대해 API 호출 시 대기 시간이 누적됨
  • 종목당 평균 3초 소요 시 총 150초 이상 소요
  • 종목별로 순차 처리하므로 모든 종목을 동시에 분석할 수 없음

개선된 코드 (병렬 처리 방식)

async def check_buy_signals(self):
    """매수 신호 체크 및 주문 실행 - 병렬 처리 버전"""
    logger.log_system("매수 신호 체크 시작")
    
    # 모니터링 종목 중 상위 50개만 체크
    symbols_to_check = MONITORED_SYMBOLS[:50]
    
    # 1. 모든 종목의 현재가를 병렬로 조회
    async def fetch_symbol_info(symbol):
        try:
            return await asyncio.wait_for(
                api_client.get_symbol_info(symbol),
                timeout=2.0
            )
        except Exception as e:
            return {"symbol": symbol, "error": True}
    
    # 모든 API 호출을 동시에 실행
    symbol_info_tasks = [fetch_symbol_info(symbol) for symbol in symbols_to_check]
    symbol_info_results = await asyncio.gather(*symbol_info_tasks)
    
    # 2. 유효한 결과에 대해 매수 신호 체크 (병렬 처리)
    valid_symbols = []
    symbol_price_map = {}
    
    for info in symbol_info_results:
        if info and "error" not in info and "current_price" in info:
            symbol = info["symbol"]
            symbol_price_map[symbol] = info
            valid_symbols.append(symbol)
    
    # 3. 매수 신호 체크 및 주문 처리 - 병렬 실행
    async def process_symbol(symbol):
        try:
            signal_info = self.check_buy_signal(symbol)
            if not signal_info["has_signal"]:
                return None
                
            return await self.process_buy_order(
                symbol=symbol,
                signal_score=signal_info["score"],
                remaining_investment=investment_info["total_amount"],
                per_stock_amount=investment_info["per_stock_amount"]
            )
        except Exception:
            return None
    
    order_tasks = [process_symbol(symbol) for symbol in valid_symbols]
    order_results = await asyncio.gather(*order_tasks)
    
    logger.log_system("매수 신호 체크 종료")

성능 개선 효과

  • 기존 순차 처리: 약 150초 소요 (50개 종목 x 평균 3초)
  • 병렬 처리 적용 후: 약 25초로 단축 (약 83% 시간 감소)
  • 최대 응답 시간: 가장 느린 API 응답 시간 + 약간의 오버헤드

핵심 개선 포인트

  1. API 호출 병렬화: 모든 종목의 현재가를 한 번에 요청하여 네트워크 대기 시간 중복 제거
    # 모든 API 호출을 동시에 실행
    symbol_info_tasks = [fetch_symbol_info(symbol) for symbol in symbols_to_check]
    symbol_info_results = await asyncio.gather(*symbol_info_tasks)
    
  2. 신호 분석 병렬화: 종목별 신호 분석을 동시에 수행
    # 모든 신호 분석을 동시에 수행
    order_tasks = [process_symbol(symbol) for symbol in valid_symbols]
    order_results = await asyncio.gather(*order_tasks)
    
  3. 단계별 처리: 현재가 조회 → 신호 분석 → 주문 처리를 단계별로 나누어 각 단계를 병렬화

주의사항

  • API 호출 제한을 고려해야 함 (초당/분당 요청 수 제한)
  • 동시 처리 수가 많을 경우 메모리 사용량 증가
  • 오류 처리를 철저히 하여 일부 종목 문제가 전체에 영향을 미치지 않도록 함

결론

Python의 asyncio를 활용한 병렬 처리를 통해 주식 트레이딩 봇의 매수 신호 체크 시간을 150초에서 25초로 대폭 단축이 예상됩니다. 시간 효율성이 중요한 트레이딩 환경에서 이러한 최적화는 더 빠른 시장 대응과 더 많은 기회 포착을 가능하게 합니다.