- Sumologic이란? 

 Sumo Logic은 클라우드 기반 로그 관리 및 분석 플랫폼이다.

 이 도구는 기업과 조직이 대규모의 로그 데이터를 실시간으로 수집, 분석 및 시각화하여 시스템 상태, 응용 프로그램 동작 및 보안 이벤트 등을 모니터링하고 이해하는 데 도움을 줄 수 있다.

 

- sumo logic docs : https://help.sumologic.com/docs/search/search-query-language/

 

 

- Sumologic Search Query란?

 Query를 이용하여 수집된 로그에서 필요한 정보만을 추출할 수 있는 기능이다.

 

 

- Search Query Base

keyword search | parse | where | group-by | sort | limit

 Sumologic에서의 Search Query의 기본 방식은 위와 같이 파이프라인(|) 을 기준으로 다양한 조건을 추가하여 목적에 맞는 로그를 찾아낼 수 있다.

 

 보통의 경우라면 아래 예시와 같이 _sourcecategory나, _index를 사용하여 특정 로그를 지정하고 그 중 원하는 값을 쿼리에 따라 추출하게 된다.

 

 아래의 쿼리에서는 "*"로 표시된 부분의 정보에서 src_ip를 추출하여 값 별로 발생한 카운팅을 산출 하고 그 카운트에 따른 정렬을 하는 쿼리이다.

_sourcecategory=apache
| parse "* --" as src_ip
| count by src_ip | sort _count

 

 주석 의 경우 라인별 주석과 영역 주석을 사용할 수 있고 아래와 같다.

 

// - Comments out a single line of the query, or a part of a single line. 
/*  */ - Creates a multiple line comment.

 

- 주요 사용 쿼리 샘플과 설명

1) parse 사용

_index=adminapi_apne2 and "exception"
| parse "[ERROR][*]" as transactionId

 위의 쿼리는 특정 Index 로그 영역을 지정하여 그 로그에서 “exception”이라는 문자열이 들어간 로그 추출한다.

 그중에 [ERROR][*] 에서 "*" 영역 부분에 transactionId가 저장되도록 로그를 구성하였는데 그  값을 추출하여 transactionId 컬럼에 노출이 되게끔 하는 쿼리이다.

 

 2) logreduce & logcompare

_index=adminapi_apne2 and "exception"
| parse "[ERROR][*]" as transactionId | logreduce

위와 같이 추출된 내용에서 logreduce를 추가하면 동일한 값(패턴)에 대해 발생빈도 count를 확인 가능하다.

"logcompare" 를  추가하면 시간별 발생 추이 ( + 또는 - 퍼센테이지)도 확인 가능하다.

 

3) count as , order by , where 조건절 사용 

_index=api_apne2 AND "REQUEST method=")
| parse "path=[*]" as path
| count as call by path
| order by  call desc
| where call > 100

 해당 index의 로그에서 "REQUEST method” 라는 텍스트가 있는 로그만 필터링 한다.

 그리고 path=[*]에서 *에 저장된 url 정보만 path 필드로 저장(추출)한다.

 ex) path=[/api/v1/example] 가 있다면 "/api/v1/example"이 추출됨.

 path별로 grouping하고 counting해서 발생빈도를 노출한다.  

 call 값에 따라 정렬을 해주고 call의 합이 100이 넘는 호출만 노출하여 보여주도록 하는 쿼리이다.

 

4) json의 맵핑 및 조건 처리

_index=api_apne2 and "RESPONSE method"
| parse "responseBody=[*]" as body
| json field=body "code" as code
| where code != 200
| parse "path=[*]" as url
| count as call by url

  해당 인덱스의 로그에서  “RESPONSE method” 문자열을 포함한 로그만 대상으로 추출하고

  responseBody=[*] 에 * 부분에 해당하는 값 만을 추출하여 body라는 값으로 지정한다.

  이 값을 json 타입으로 mapping하여 그 값내에 code라는 값을 체크하고 그 값이 (result_code) 200이 아닌(오류) 경우에 대해서

 path를 url이라고 정의하고 추출하며 url 별로 grouping하여 카운팅한다.

 

5) keyvalue, Max() 사용

(_index=api_apne2 OR _index=admin_apne2)
AND ("/user" AND "service=Hello")
AND _sourceCategory="api-application-log"
| keyvalue auto keys "userKey" as serviceUserId refonly
| max(_messagetime) as time group by serviceUserId
| formatDate(toLong(time), "yyyy-MM-dd HH:mm:ss") as time

 

 두 개의 로그 index 내에서 "/user"라는 api 와 “service=Hello”이라는 문구를 포함한 로그 중에

 api-application-log 에 소스카테고리(로그파일 저장 path) 중  로그에 포함된 값에서 검색을 한다.

 userKey에 맵핑된 값을 serviceUserId 로 필터링하여 추출하고 serviceId별로 그룹핑하여 최고 큰 _messagetime을 가진 값만을 각각 추출한다.

 추출된 최종 대상 로그의 time값을 formatDate()에 맞게 포맷팅 하여 time 값으로 추출하여 화면에 보여준다.

 

6) substring()의 사용

(_index=api_apne2 OR _index=admin_apne2)
and "RESPONSE method"
AND _sourceCategory="/apne2/log-application-log"
| parse "path=[*]"as path
| substring(path, 0, 22) as fixed_path
| where fixed_path = "/api/sample"
| substring(path, 23, 45) as param
| count as accl by param

 두 개의 로그 index 내에서 response 문자열이 포함된 로그만을 대상으로 _sourceCategory = /apne2/log-application-log 을 대상으로 쿼리를 수행한다.

  *에 저장된 url path 내용만을 path라는 변수로 추출하고 그 중에 고정 url인 부분은 substring을 사용하여 fixed_path로 지정한다.

 fixed_path ="/api/sample" 은 로그만을 검색하여 나머지 url 부분을 substring으로 잘라서 param 변수로 처리하고  param별로 그룹핑하여 몇 번씩의 호출이 있었는지 확인한다.

 

 

 이 외에도 위에 링크를 걸어둔 Docs를 통해 더 다양한 Search query를 만들어 필요한 로그만을 효율적으로 검색하여 정보를 얻을 수 있다.

| Kafka란?

  • 카프카(Kafka)고성능 분산 이벤트 스트리밍 플랫폼이다. 흔히 메시지(이벤트) 큐라고 불리우며 Pub-Sub 모델 형태로 동작하며 분산환경에 특화되어 있다.
  • producer 라고 불리우는 source application으로 부터 전송 받은 메시지를 broker에 저장해두며 이를 consumer라는 target application에서 사용할 수 있도록 해준다.
  • kafka는 pub/sub 모델로써 consumer가 broker에 있는 메시지를 polling하는 형태로 broker의 부하를 줄여주어 효율적으로 대용량 처리를 병렬로 가능하게끔 한다.
  • 메시지를 파일 시스템에 저장하기 때문에 데이터의 영속성을 보장한다.
  • 기존의 메시징 시스템은 메시지 큐에 적재된 메시지 양이 많을수록 성능이 크게 감소하였지만, Kafka는 메시지를 파일 시스템에 저장하기 때문에 성능이 크게 감소하지 않는 이점이 있다.

kafka 기본 구성도

 

| 주요 내용 정리

  • Producer
    • 데이터를 생성하여 kafka cluster에 데이터를 전송하는 역할을 한다.
    • kafka는 topic이라는 주제에 따라 데이터를 전송하고 broker는 topic을 기준으로 구분하여 데이터를 저장한다.
    • 설정에 따라 재처리 또는 실패에 대해 관리가 가능하다. ( DLQ = Dead Letter Queue 로 관리 할 수도 있다.)
    • kafka는 key를 고유 로직으로 hashing 처리하여 저장하기에 동일한 key는 동일한 파티션에 항상 담기게 된다.
    • But! 파티션이 늘어난다면(변동이 생긴다면) 이 rule이 깨지기 때문에 rule을 유지해야 하는 서비스라면 최초에 설정시 파티션의 갯수도 적정하게 설정할 필요가 있다.
  • Broker
    • 데이터 저장 : producer로 부터 전송받은 데이터를 영속적으로 저장한다. 저장시에는 topic별로 , 그리고 토픽내 파티션에 분산하여 저장한다.
    • producer로 받은 데이터를 consumer가 가져갈 수 있게 역할을 수행. consumer는 토픽별로 데이터를 읽어가서 처리가 가능하다.
    • kafka cluster 관리 : broker는 cluster의 상태와 구성을 관리한다. 파티션 중 리더 파티션을 선출하고 레플리카의 데이터 복제를 관리.
  • Topic
    • 메시지를 저장하는 논리적인 그룹의 단위
    • RDB로 생각하면 각각의 테이블 처럼 데이터를 저장함.
    • 데이터의 형태는 다양한 형태로 저장 가능 ( ex. string, set, json 등등)
    • 하나의 토픽은 보통 1개 이상의 파티션으로 구성되어 분산 저장(처리)이 되는 형태.
    • 토픽에 저장된 데이터는 consumer가 소비해도 삭제되지 않는다. ( 고가용성이 가능한 이유 중 하나다.)
    • 메시지 삭제 정책은 존재하여 시간이나 데이터 총 사이즈로 설정할 수 있다.
  • Partition
    • 토픽은 하나의 브로커에 하나의 논리적인 곳에 저장되지 않고 데이터를 파티션으로 나누어서 저장한다.
    • 라운드-로빈 방식으로 데이터를 각 파티션에 저장한다.
    • 파티션이 나뉘어져 있어서 병렬로 데이터의 비동기 처리가 가능하다.
    • 파티션이 2개 이상이면 데이터 처리의 순서 보장은 불가능하다.
  • Consumer
    • kafka broker에 topic에 저장된 데이터를 subscribe하여 그에 맞게 처리하는 역할을 한다.
    • consumer는 보통 consumer group을 구성하여 그룹내에서 파티션을 각각 맵핑 및 접근하여 처리한다.
  • Consumer group
    • consumer group은 특정 토픽을 동일하게 처리할 때 병렬로 분산 처리하기 위해서 그룹핑된 컨슈머 집합.
      consumer group에 속한 각각의 컨슈머는 하나의 토픽에 여러개의 파티션이 있을 때 같은 파티션에는 하나의 컨슈머만 접근이 가능하도록 제한이 되어 있다. 하지만 하나의 컨슈머는 하나의 토픽에 속한 여러개의 파티션에는 중복적으로 접근하여 처리가 가능하다.
    • 컨슈머 그룹내에는 group leader가 존재하여 consumer가 이슈가 있거나 할 때 리밸런싱을 하는 등의 관리를 맡는다. reader consumer는 토픽에 최초로 ack를 보내어 응답을 받은 consumer가 자동으로 할당된다.
  • *** 컨슈머 그룹과 파티션 맵핑 예시 ****

broker와 consumer group간의 맵핑 순서

  • 1] join group 호출 (내가 너의 토픽을 consume 할거야!)
  • 2] group leader 선출 (선착순!!)
  • 3] 파티션과 컨슈머 그룹내 consumer와의 맵핑
  • 4] 해당 맵핑 정보 전송 ( Group coordinator는 해당 정보를 zookeeper에게 전달)
  • 5] 컨슈머에게 해당 내용 전파 ( 전파된 파티션으로 부터 데이터 consume 진행)

** 파티션과 컨슈머 갯수별 맵핑은??

  • 1) 토픽 파티션 1 : 컨슈머 1 => 이렇게 구성할 경우 순서의 보장은 될 수 있으나 컨슈머가 문제가 생길경우 메시지를 처리할 수 없다.(장애발생)

2) 토픽 파티션 1: 컨슈머 그룹내 컨슈머 4개 => 이럴 경우 리더 컨슈머가 문제가 생길 경우 follower 컨슈머가 그 작업을 대신할 수 있다.

(하지만 컨슈머 스레드 3개는 아무일도 하지 않고 휴면 상태와 같이 있게 된다.) 

 3) 토픽 내 파티션 1개 : 컨슈머 그룹내 컨슈머 2개 => 같은 컨슈머 그룹내에서 동일 토픽에 파티션에는 1개의 컨슈머만접근 가능한 제한으로 인해 한개의 컨슈머는 휴면 상태와 다름 없이 일을 하지 않는다.

 

4) 토픽내 파티션 4 : 컨슈머 그룹내 컨슈머 2개 => 이럴 경우 컨슈머 1대당 2개의 파티션에 붙어서 R-R방식으로 데이터를 가져와 처리한다.

5) 토픽내 파티션 4 : 컨슈머 그룹내 컨슈머 2개 => 이럴 경우 컨슈머 1대당 2개의 파티션에 붙어서 R-R방식으로 데이터를 가져와 처리한다.

6) 토픽내 파티션 4개 : 컨슈머 그룹 2개 => 각각의 컨슈머 그룹은 토픽에 파티션에 1:1로 맵핑하여 동일한 토픽 메시지를 같이 사용할 수 있다. ( 두 가지 로직을 태울 수 있다.)

위와 같이 컨슈머 그룹이 두 개라면 브로커에 하나의 토픽에 파티션은 consumer group별로 각각의 offset을 가지고 있어서 데이터가 어디까지 처리했는지 구분이 가능하게끔 제공하고 있다.

 

| 주요 config

  • Producer
속성 설명 예시
bootstrap.servers(BOOTSTRAP_SERVERS_CONFIG) 카프카 클러스터의 부트스트랩 서버 목록을 지정하고 연결하는데 사용. localhost:9092
acks(ACKS_CONFIG) 프로듀서가 메시지를 보낸 후 리더(broker)로부터 확인(acknowledgment)을 기다릴지 여부를 설정 0(기다리지 않음 - 손실 가능성 존재)
1(리더 브로커에게만 전송 여부 받기),
all(레플리케이션에 저장 여부도 받기)
retries(RETRIES_CONFIG) 메시지 전송 중에 일시적인 오류가 발생한 경우 프로듀서가 재시도할 최대 횟수를 설정 0 : 재처리 하지 않음.
5 : 5회 재전송함.
compression.type(COMPRESSION_TYPE_CONFIG) 메시지 압축 방법을 설정합니다. 압축을 사용하면 전송 대역폭을 줄일 수 있음. none, gzip, snappy, lz4, zstd
key.serializer , value.serializer 프로듀서에서 사용하는 메시지의 키(key)와 값(value)의 직렬화 방법을 설정 StringSerializer

 

  • consumer
속성 설명 예시
bootstrap.servers(BOOTSTRAP_SERVERS_CONFIG) 카프카 클러스터의 부트스트랩 서버 목록을 지정하고 연결하는데 사용. localhost:9092
group.id 컨슈머 그룹의 식별자를 설정합니다. 같은 그룹에 속한 컨슈머들은 메시지를 공유하여 소비한다. example_group_id
enable.auto.commit 자동 커밋을 사용할지 여부를 설정합니다. 자동 커밋을 활성화하면 컨슈머가 일정 주기마다 오프셋을 자동으로 커밋 true
auto.commit.interval.ms 자동 커밋 주기를 설정합니다. enable.auto.commit가 true로 설정되어 있을 때 커밋 주기마다 오프셋이 커밋 5000
auto.offset.reset 컨슈머가 처음 실행되거나 오프셋이 존재하지 않는 경우 어떻게 오프셋을 재설정할지를 설정 earliest(가장 적은 offset),
latest(가장 최근 offset),
none(오프셋을 재설정하지 않음) 
max.poll.records 단일 poll() 호출에서 반환되는 최대 레코드 수를 설정 1 or 100 , 200, 300 …
session.timeout.ms session timeout 최대 설정 시간 10000
max.poll.interval.ms polling interval 최대 시간 50000
key.deserializer ,value.deserializer 컨슈머에서 사용하는 메시지의 키(key)와 값(value)의 역직렬화 방법을 설정 StringDeserializer

 

** 리밸런싱(브로커 대상 토픽과 컨슈머의 재맵핑) 과 관련된 주요 설정들

# 요청에 대해 응답을 기다리는 최대 시간

requestTimeoutMs: 30000

# 컨슈머와 브로커사이의 세션 타임 아웃시간.

sessionTimeoutMs: 30000

# session.timeout.ms와 밀접한 관계가 있으며 session.timeout.ms보다 낮아야한다. 일반적으로 1/3

heartbeatIntervalMs: 10000

# 컨슈머가 polling하고 commit 할때까지의 대기시간

maxPollIntervalMs: 600000

# poll ()에 대한 단일 호출에서 반환되는 최대 레코드 수

maxPollRecords: 1

위에서 보면 consumer는 한 번에 하나의 레코드를 가져오고 , polling을 하여 처리대기 시간은 상대적으로 길게 잡았으며, 세션 타임아웃보다 heartbeats 인터벌은 짧게 잡아 컨슈머의 이상여부(정상 실행 상태여부) 를 체크할 수 있게 설정해야 한다.

그리고 세션 타임아웃은 폴링한 데이터가 정상적으로 처리가 가능한 이상 수준의 타임아웃을 가져야 Rebalancing 이슈가 발생하지 않는다.

그러므로 max poll record를 너무 길게 잡으면 세션 타임 아웃 등 설정 정보도 늘어나서 컨슈머가 실제로 정상적인지 체크하는데 어려움이 발생할 수 도 있다.

session.timeout.ms

이 옵션은 heartbeat 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며 heartbeat.interval.ms와 밀접한 관련이 있어서 일반적으로 두 속성이 함께 수정된다.

heartbeat.interval.ms 컨슈머가 얼마나 자주 heartbeat을 보낼지 조정한다. session.timeout.ms보다 작아야 하며 일반적으로 1/3로 설정 ex. 3000 (3초)

max.poll.interval.ms

이러한 경우에 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외시키도록 하는 옵션이다.300000 (5분)

max.poll.records

이 옵션으로 polling loop에서 데이터 양을 조정 할 수 있다.

enable.auto.commit

백그라운드로 주기적으로 offset을 commit = true

auto.commit.interval.ms

주기적으로 offset을 커밋하는 시간5000 (5초)

auto.offset.reset

이전 offset값을 찾지 못하면 error 발생 Ex.latest

 

 

**추가 정보 : kafka vs rabiitMQ vs redis pub/sub 의 비교 

 

참고 정보 : 

https://velog.io/@hyun6ik/Apache-Kafka-Consumer-Rebalance

 

Apache Kafka - Consumer Rebalance

Consumer는 메시지를 가져오기 위해서 Partition에 연속적으로 Poll한다.가져온 위치를 나타내는 offset 정보를 \_\_consumer_offsets Topic에 저장하여 관리한다.동일한 group.id로 구성된 모든 Consumer들은 하나

velog.io

https://magpienote.tistory.com/254

 

[Kafka 원리]Kafka Consumer 동작 원리 이해하기 (Consumer group, rebalancing, commit_offset, option, fetch 튜닝, messa

목차 consumer란? consumer offset Consumer는 어떤 메시지를 읽을 수 있을까? consumer group Consumer 메세지 읽기 옵션 Consumer Rebalance Consumer Group간 Rebalancing 옵션 그러면 언제 언제 Rebalancing 될까? Consumer Message O

magpienote.tistory.com

https://jyeonth.tistory.com/30

 

Apache Kafka 기본 개념 (Partition / Consumer / Consumer Group/ Offset Management)

Kafka는 가장 널리 쓰이는 메세지 큐 솔루션 중 하나이다. 다른 메세지 큐와 마찬가지로, Producer가 메세지를 publish하면 Consumer가 큐를 susbscribe하며 메세지를 가져가게 된다. 다만, 이 사이에 Topic / P

jyeonth.tistory.com

https://velog.io/@holicme7/Apache-Kafka-%EC%B9%B4%ED%94%84%EC%B9%B4%EB%9E%80-%EB%AC%B4%EC%97%87%EC%9D%B8%EA%B0%80

 

[Apache Kafka] 카프카란 무엇인가?

카프카, 데이터 플랫폼의 최강자 책을 공부하며 쓴 정리 글 입니다.카프카(Kafka)는 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 설계된 고성능 분산 이벤트

velog.io

https://www.popit.kr/kafka-consumer-group/

 멀티 모듈 프로젝트에 OpenFeign을 적용하고 다음과 같은 Exception이 발생하는 상황이 생겼습니다. 

The bean 'exampleService.FeignClientSpecification' could not be registered. 
Bean already defined and overriding is disabled

 

구글링을 해보니 우선 첫번째 해결 방법은 FeignClient 어노테이션 선언 부분에 중요된 name 이나 contextId가 있는지 확인해 보라는 

내용 이었습니다. 

 

@FeignClient(name="ExampleService_1", configuration = FeignClientConfig.class, url = "http://localhost:8091")
@FeignClient(name="ExampleService_2", configuration = FeignClientConfig.class, url = "http://localhost:8091")

 하지만 기존 로직에 contextId 속성은 정의하지 않았고 name도 중복되지 않았습니다. 

 현재 상황에서 contextId를 추가해도 동일했습니다. 

 

 결국 원인을 찾다보니 멀티 모듈에서 모듈별로 @EnableFeignClients 를 선언하다 보니 중복에 의한 원인이 이슈였습니다.

 

 ** 해결방법 : application.yml에 중복 설정을 해도 오버라이딩이 허용하도록 설정을 추가하여 해결했습니다.

spring.main.allow-bean-definition-overriding=true

'Spring' 카테고리의 다른 글

CommandLineRunner을 이용하여 특정 클래스 수행하기  (0) 2023.05.17

 java 에서는 premitive type, reference type이 존재한다.

  • premitive type = int, boolean, long , double 등 실제 데이터를 저장하는 타입을 말하고 ,
  • reference type 은 primitive type 이외에 reference (참조 주소)를 갖고 있는 것을 말한다.
    대표적으로 Long, Integer 등 wrapper class와 같이 class 와 array, enum 등 주소값을 저장하고 참조하는 타입을 말한다.

 배포 되어 운영되던 서비스 운영중에 이상한 일이 발생하였다.

 테이블의 key값을 Long 타입으로 저장하여 구현된 소스였는데 그 값이 다른 키 값과 동일한지 체크를 할때 "=="(a==b) 로 할때 127번까 지는 문제가 없었는데 128 이상 부터는 값이 같지 않다는 것이다.

 Long a = 13L;
 Long b = 13L;

일 경우 a==b → true 이지만 이 값이 128이 넘어가면 false가 되는 것이다.

 원인을 찾아보니 아래와 같았다.

 

이 이유는 JAVA에서 -128 ~ 127 범위의 정수 객체를 캐싱해놓기 때문이다.
(이 값들은 빈번하게 사용되기 때문에)

/** * Returns an {@code Integer} instance representing the specified 
* {@code int} value. If a new {@code Integer} instance is not 
* required, this method should generally be used in preference to 
* the constructor {@link #Integer(int)}, as this method is likely 
* to yield significantly better space and time performance by 
* caching frequently requested values. *
 * This method will always cache values in the range -128 to 127, 
* inclusive, and may cache other values outside of this range. *

 

 정리해보면 127까지는 미리 캐싱을 해두어서 == 으로 체크를 해도 메모리 주소가 동일하였기에 문제가 없었으나
 128을 넘어가면서 새로운 주소에 데이터가 저장되면서 일치하지 않는 상황인 것이다.

 

 ** 해결법reference type wrapper class 경우 '==' 아닌 .equals 사용하여 값을 동일여부를 체크하도록 구현하면 이슈는 해결된다. ex) a.equals(b) 해야한다.

@ExtendWith(MockitoExtension.class)
classMediaServiceTests {

@Mock
PhotoService photoService;

@Mock
VideoService videoService;

@InjectMocks
MediaService mediaService;

@Test
voidhasMediaByNotHasUserInfo() {
        UserDetailDto userDetailDto =newUserDetailDto();

//static method를 호출하기 위해 MockedStatic으로 선언
MockedStatic<UserDetailDto> userDetailDtoMockedStatic = Mockito.mockStatic(UserDetailDto.class);

        when(UserDetailDto.get()).thenReturn(userDetailDto);

        Assertions.assertFalse(mediaService.hasMedia(1L));
    }

Mockito를 이용하여 static method 테스트 코드 작성시에는 MockStatic을 사용해야 한다.

Mockstatic 경우 mockito 3.4.0버전 이후부터 지원하며 이를 사용하기 위해서는 Gradle에 아래와 같이 추가해줘야 한다.

 

build.gradle에 추가

testImplementation('org.mockito:mockito-inline:3.4.0')

 

위의 코드처럼 class mockito.mockstatic 타입으로 선언 후에 static method call하여 테스트 코드 작성 및 확인이 가능하다.

 

 jenkins로 배치를 실행하는 시점에 특정 클래스를 구동하고 싶을 때 CommandLineRunner를 사용하여 구현하면 수행이 가능하다.
 이렇게 애플리케이션 구동 시점에 특정 코드를 구현시키기 위해 제공하는 인터페이스는 두가지가 있다.

 

 1. CommandLineRunner

 2. ApplicationRunner

 

 기본 코드는 아래와 같다.

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class DemoCommandLineRunner implements CommandLineRunner {

   @Override
   public void run(String... args) throws Exception {
       System.out.println("CommandLineRunner Args: " + Arrays.toString(args));
   }

}

 등록시에는 @Component annotation을 선언하여 수행이 가능하게끔 해주면 된다.

 CommandLineRunner의 경우 String args로 args[]로 특정 값을 받아 실행이 가능하다.

 ApplicationRunner 의 경우에도 크게 다른 부분은 없지만 parameter가 String이 아닌ApplicationArguments 타입으로 인자를 받아 처리가 가능하다.

 만약 CommanLineRunner나 ApplicationRunner 를 다중으로 수행하여야 할때 순서를 정리하고 싶다면 @Order annotation으로 순서를 정해줄 수 도 있다.

 jenkins에서 인자를 호출받아 특정 클래스를 수행시켜 주고 싶다면 아래와 같이 구현이 가능하다.

@Component
@RequiredArgsConstructor
public class BatchCommandLineRunner implements CommandLineRunner {

    private final ApplicationContext context;
   
    @Override
    public void run(String... args) {
        Map<String, BatchJob> jobs = context.getBeansOfType(BatchJob.class);

		//실행시킬 특정 클래스명을 인자로 받는다. 
        final String className = args[0];

		//Batchjob은 interface로 해당 클래스를 implement해서 구현하였고 run()
		//메서드를 실행하여 그에 맞는 클래스가 수행되도록 한다.
        jobs.entrySet().stream()
            .filter(job -> job.getKey().equals(className))
            .findFirst()
            .ifPresent(jobEntry -> {
                BatchJob batchJob = jobEntry.getValue();
				
                //CommandLineRunner에서 인자로 받은 String args에서 클래스명을 제외한 내용을 저장한다.
                List<String> arguments = Arrays.stream(args).skip(1).collect(Collectors.toList());
                String argumentString = Joiner.on(" ").join(arguments);

                int result = -1;
                try {
					//특정 클래스의 run()메서드가 실행되도록 한다.
                    result = batchJob.run(arguments);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.exit(result);
                }
                System.exit(result);
            });
        System.exit(-1);
    }
}

 

+ Recent posts