AWS Lambda タイムアウトの捕捉

AWS Lambda のタイムアウトJava で作成するLambda関数に捕捉させる要件。
こういう要件、なんで Python で作らないんだという声は置いといて。。。
タイムアウトする Lambda は、ログ出力は、awslogs で CloudWatch で閲覧が可能という前提

Lambda のタイムアウトAWSが、"Task timed out" のログを出す
⇒ Cloud Watch Log をトリガーに、フィルターで、"Task timed out" を捕捉
⇒ トリガー設定した Lambda にイベント通知で該当ログ "Task timed out" と
  ロググループ、ログストリーム名が送られてくる
⇒ Cloud Watch を参照する Client を作成してタイムアウト発生したログを参照

Cloud Watch を参照する Client は、AWS SDK-Java version 2 を使うので、
Maven 依存関係

<dependency>
   <groupId>software.amazon.awssdk</groupId>
   <artifactId>cloudwatchlogs</artifactId>
   <version>2.17.168</version>
</dependency>

Cloud Watch Log をトリガーから、タイムアウト捕捉するLamnda に送られてくるデータ(JSON)は。
gzip 圧縮されて更に Base64 エンコードされたデータで、Lambdaリクエストハンドラに渡されるので、gzip 解凍の為に
Apache common の commons-compress を使って gzip 解凍する

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-compress</artifactId>
    <version>1.21</version>
</dependency>

あとついでに、以下、パターンマッチを簡単に書くために
ReturnalConsumer を使いたくて、、

<dependency>
        <groupId>org.yipuran.core</groupId>
        <artifactId>yipuran-core</artifactId>
        <version>4.32</version>
</dependency>

必要なインポート

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
import org.yipuran.function.ReturnalConsumer;

タイムアウト捕捉する Lambda Handler

public class TimeoutCatchHandler implements RequestHandler<InputStream, String>{
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public String handleRequest(InputStream input, Context context){
        ObjectMapper mapper = new ObjectMapper();
        try{
            Map<String, Map<String, Object>> map = mapper.readValue(input, new TypeReference<Map<String, Map<String, Object>>>(){});
            String data = map.get("awslogs").get("data").toString();
            logger.info("map  = " + map);
            String json = readLog(Base64.getDecoder().decode(data));
            logger.info("json = " + json);
            AwsLogCatch logcatch = mapper.readValue(json, new TypeReference<AwsLogCatch>(){});
            String message = logcatch.getLogEvents().get(0).getMessage();

            logger.info("messageType = " + logcatch.getMessageType());
            logger.info("logGroup    = " + logcatch.getLogGroup());
            logger.info("logStream   = " + logcatch.getLogStream());
            logger.info("getLogEvents() = " + logcatch.getLogEvents());
            logger.info("id = " + logcatch.getLogEvents().get(0).getId());
            logger.info("timestamp = " + logcatch.getLogEvents().get(0).getTimestamp());
            logger.info("message = " + message);

            int begin = ReturnalConsumer.of(Matcher.class).with(Matcher::find).apply(Pattern.compile(
                    "\\d{4}\\-(0[1-9]|1[012])\\-(0[1-9]|[12][0-9]|3[01])T(0[0-9]|1[0-9]|2[0-3]):(0[0-9]|[0-5][0-9]):(0[0-9]|[0-5][0-9])\\.[0-9]{3}Z"
            ).matcher(message)).end();
            int end = ReturnalConsumer.of(Matcher.class).with(Matcher::find).apply(Pattern.compile("Task timed out").matcher(message)).start();
            String requestId = message.substring(begin, end).trim();

            logger.info("requestId = " + requestId);

            CloudWatchLogsClient client = CloudWatchLogsClient.builder().region(Region.of”ap-northeast-1”)).build();
            GetLogEventsRequest getLogEventsRequest = GetLogEventsRequest.builder()
                    .logGroupName(logcatch.getLogGroup())
                    .logStreamName(logcatch.getLogStream())
                    .startFromHead(true)
                    .build();
            
            client.getLogEvents(getLogEventsRequest).events().stream().forEach(event->{
                logger.info(event.message());
            });
        }catch(IOException e){
            logger.error(e.getMessage(), e);
        }
        return null;
    }
    private String readLog(byte[] input) {
        try(ByteArrayInputStream bin = new ByteArrayInputStream(input);GzipCompressorInputStream gin = new GzipCompressorInputStream(bin)){
            ByteArrayOutputStream bo = new ByteArrayOutputStream();
            int size = 0;
            byte[] buf = new byte[1024];
            while((size = gin.read(buf)) > 0){
                bo.write(buf, 0, size);
                bo.flush();
            }
            bo.close();
            return new String(bo.toByteArray(), StandardCharsets.UTF_8);
        }catch(IOException e){
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

トリガーが送ったイベントデータは、RequestHandler InputStream で受け取るデータの "data"キーで受け取った
データ JSON が、gzip を圧縮されて Base64 エンコードされているので、
private String readLog(byte[] input) のメソッドで、Base64 デコードしたデータから
gzip 解凍して抽出したJSONを、以下の 読みやすい解析のための任意のクラスに、
Jacksonで変換する。

import java.io.Serializable;
import java.util.List;
import lombok.Data;
/**
 * AwsLogCatch
 */
@Data
public class AwsLogCatch implements Serializable{
    private static final long serialVersionUID = 1L;
    private String messageType;
    private String owner;
    private String logGroup;
    private String logStream;
    private List<String> subscriptionFilters;
    private List<AwsLogEvent> logEvents;
}
import java.io.Serializable;
import lombok.Data;
/**
 * CloudWatch トリガー フィルターに引っ掛かったログ
 */
@Data
public class AwsLogEvent implements Serializable{
    private static final long serialVersionUID = 1L;
    private String id;
    private long timestamp;
    private String message;
}

これが、トリガー フィルターに引っ掛かったログを解析するもので、

AwsLogCatch logcatch = mapper.readValue(json, new TypeReference<AwsLogCatch>(){});

Cloud Watch を参照する Client は、AWS SDK-Java version 2 は、
CloudWatchLogsClient client を生成して、logGroup , logStream を指定して
読み込むのである。
必要なロール権限ポリシーは、CloudWatchLogsReadOnlyAccess


PythonCloudWatchLogs — Boto3 Docs 1.21.42 documentation
に相当するクライアントで GetLogEventsRequest を作って
CloudWatchLogsClient の getLogEventsメソッドの evebts()でようやくタイムアウト発生した Lambda のログを
参照することができる。

上の

      logger.info("timestamp = " + logcatch.getLogEvents().get(0).getTimestamp());

で取得している時刻は、epoc ミリ秒なので

long epocmili = Long.parseLong(logcatch.getLogEvents().get(0).getTimestamp()));
LocalDateTime time = Instant.ofEpochMilli(epocmili).atZone(ZoneId.of("Asia/Tokyo")).toLocalDateTime();

とすれば、LocalDateTime を見ることができるだろう。