본문 바로가기

SearchDeveloper/SpringBoot

[JPA] QueryDSL stream DB connection 에러 해결 과정 ② - @Transactional

버전

Spring Boot 2.7.4

QueryDSL 5.0.0

MySQL 5.7

이전 글

https://elsboo.tistory.com/40

 

[JPA] QueryDSL stream DB connection 에러 해결 과정 ①

버전 Spring Boot 2.7.4 QueryDSL 5.0.0 MySQL 5.7 DB 데이터를 가공해 카프카로 전송하는 애플리케이션을 만들면서 약 1주간 많은 난관에 봉착했고 많은 삽질을 했다. 어떤 에러가 있었고 어떻게 해결하였는

elsboo.tistory.com

② stream 데이터 사용 좀 해보려는데 forEach() 에서 Operation not allowed after ResultSet closed 에러…

결론

@Transactional 가 붙은 메소드는 메소드 종료 시 DB connection 과 stream 도 같이 close 된다.

에러 현상

DB 에서 데이터 추출하는 부분 (Extract), 추출한 데이터를 가공 (Transform) 하는 부분의 분리를 위해 stream 으로 데이터를 가져오는 부분은 메소드로 따로 뺐다.

TableARespository.java

/**
   데이터 가져오기 (Extract)
**/
@Transactional
public Stream<TableA> extract() {   
    return jpaQueryFactory
           .selectFrom(tableA)
           .stream()
    ;
}

 

Transformer.java

/** 
   데이터 가공하기 (Transform)
**/
public void transform() {
    try (Stream<RecruitSearchSelect> stream = tableARepository.extract()) {
        stream.forEach(tableA-> {
            String id = tableA.getId();
                    // 데이터 가공
                    tableBRepository.find(id);
                    tableCRepository.find(id);
                    tableDRepository.find(id);
                    // ...
        });
    }
}

 

그런데 stream.forEach() 부분에서 아래 에러 로그를 뱉은 후 프로그램이 종료되었다.

[WARN] SQL Error: 0, SQLState: S1000
[ERROR] Operation not allowed after ResultSet closed
[ERROR] could not advance using next()

원인 파악

Operation not allowed after ResultSet closed 는 JDBC에서 쿼리 결과를 담는 ResultSet 이 close 됐음에도 접근할 때 나는 에러이고

could not advance using next() 은 java stream 이 close 됐음에도 접근할 때 나는 에러이다.

그럼 stream 을 반환하는 tableARepository.extract() 에 문제가 있나보다 싶어서 @Transactional 구현부 코드를 확인해보았다.

Multi datasource 에서 Transaction 관리자로 JpaTransactionManager 를 선언하므로 JpaTransactionManager 를 시작점으로 삼았다.

트랜잭션 시작

JpaTransactionManager.java

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    JpaTransactionObject txObject = (JpaTransactionObject) transaction;
    // ...
    EntityManager em = txObject.getEntityManagerHolder().getEntityManager();

    // Delegate to JpaDialect for actual transaction begin.
    int timeoutToUse = determineTimeout(definition);
    Object transactionData = getJpaDialect().beginTransaction(em,
            new JpaTransactionDefinition(definition, timeoutToUse, txObject.isNewEntityManagerHolder()));
    // ...
}

트랜잭션 시작 준비 단계로 JpaTransactionManager.doBegin() 에서는 EntityManager 를 생성하고 getJpaDialect().beginTransaction() 를 통해 JPA 구현체인 하이버네이트 HibernateJpaDialect 에게 넘겨준다.

 

HibernateJpaDialect.java

@Override
public Object beginTransaction(EntityManager entityManager, TransactionDefinition definition)
        throws PersistenceException, SQLException, TransactionException {
    SessionImplementor session = getSession(entityManager);
    // ...
    preparedCon = session.connection();
    // ...
    // Standard JPA transaction begin call for full JPA context setup...
    entityManager.getTransaction().begin();
    // ...
}

HibernateJpaDialect 에서 EntityManager 로 Session 을 생성하고 Session 으로 DB 에 접근할 수 있는 connection 을 생성한다.

트랜잭션 종료

JpaTransactionManager.java

@Override
protected void doCleanupAfterCompletion(Object transaction) {
    JpaTransactionObject txObject = (JpaTransactionObject) transaction;
    // ...
    EntityManager em = txObject.getEntityManagerHolder().getEntityManager();
    // ...
    EntityManagerFactoryUtils.closeEntityManager(em);
    // ...
}

트랜잭션이 commit 혹은 rollback 동작을 마친 후 맨 마지막에 JpaTransactionManager.doCleanupAfterCompletion() 를 호출한다.

 

EntityManagerFactoryUtils.java

/**
 * Close the given JPA EntityManager,
 */
public static void closeEntityManager(@Nullable EntityManager em) {
    // ...
    em.close();
    // ...
}

마침내 EntityManagerFactoryUtil.closeEntityManager() 를 통해 EntityManager 는 close 된다.

에러가 나는 이유

@Transactional 범위, 즉 tableARepository.extract() 메소드가 종료되면

EntityManager close
→ EntityManager 가 관리하는 Session close
→ Session으로 Connection 을 받아온 후 쿼리 결과를 담는 resultSet 도 close
stream close

의 단계를 거쳐 위의 에러가 났던 것이다.

해결

데이터 접근이 필요한 부분까지 @Transactional 범위를 확장시킨다.

Transformer.java

/** 
  ② 데이터 가공하기 (Transform)
**/
@Transactional
public void transform() {
    try (Stream<RecruitSearchSelect> stream = tableARepository.extract()) {
        stream.forEach(tableA-> {
            String id = tableA.getId();
                    // 데이터 가공
                    tableBRepository.find(id);
                    tableCRepository.find(id);
                    tableDRepository.find(id);
                    // ...
        });
    }
}

extract() 가 아닌 데이터 가공까지 하는 transform()@Transactional 를 명시하였고 그 후로 위 에러는 발생하지 않았다.

 

레퍼런스

https://private-space.tistory.com/98

[JPA] QueryDSL stream DB connection 에러 해결 과정 ② - @Transactional

 

글 읽어주셔서 언제나 감사합니다. 좋은 피드백, 개선 피드백 너무나도 환영합니다.