package com.top_logic.kafka.sync.knowledge.service.importer;

import com.top_logic.basic.Logger;
import com.top_logic.basic.StringServices;
import com.top_logic.basic.config.ConfigurationException;
import com.top_logic.basic.config.InstantiationContext;
import com.top_logic.basic.config.PolymorphicConfiguration;
import com.top_logic.basic.config.TypedConfiguration;
import com.top_logic.basic.db.schema.properties.DBProperties;
import com.top_logic.basic.logging.LogUtil;
import com.top_logic.kafka.log.KafkaLogUtil;
import com.top_logic.kafka.services.consumer.ConsumerProcessor;
import com.top_logic.kafka.sync.knowledge.service.TLSyncRecord;
import com.top_logic.kafka.sync.knowledge.service.TLSyncUtils;
import com.top_logic.knowledge.event.ChangeSet;
import com.top_logic.knowledge.event.EventWriter;
import com.top_logic.knowledge.event.convert.EventRewriter;
import com.top_logic.knowledge.event.convert.StackedEventWriter;
import com.top_logic.knowledge.service.KBUtils;
import com.top_logic.knowledge.service.KnowledgeBase;
import com.top_logic.knowledge.service.KnowledgeBaseFactory;
import com.top_logic.knowledge.service.KnowledgeBaseName;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

/* loaded from: input_file:com/top_logic/kafka/sync/knowledge/service/importer/KBDataProcessor.class */
public class KBDataProcessor implements ConsumerProcessor<String, TLSyncRecord<ChangeSet>> {
    private final KnowledgeBase _kb;
    private final List<? extends EventRewriter> _rewriters;
    private final DBProperties _dbProperties;

    /* loaded from: input_file:com/top_logic/kafka/sync/knowledge/service/importer/KBDataProcessor$Config.class */
    public interface Config extends PolymorphicConfiguration<KBDataProcessor>, KnowledgeBaseName {
        List<PolymorphicConfiguration<EventRewriter>> getRewriters();
    }

    public KBDataProcessor(InstantiationContext instantiationContext, Config config) throws ConfigurationException {
        this._kb = KnowledgeBaseFactory.getInstance().getKnowledgeBase(config.getKnowledgeBase());
        this._dbProperties = new DBProperties(KBUtils.getConnectionPool(this._kb));
        this._rewriters = TypedConfiguration.getInstanceListReadOnly(instantiationContext, config.getRewriters());
    }

    public void process(ConsumerRecords<String, TLSyncRecord<ChangeSet>> consumerRecords) {
        LogUtil.withLogMark(TLSyncUtils.LOG_MARK_TL_SYNC, "true", () -> {
            processWithLogMark(consumerRecords);
        });
    }

    protected void processWithLogMark(ConsumerRecords<String, TLSyncRecord<ChangeSet>> consumerRecords) {
        if (consumerRecords.isEmpty()) {
            return;
        }
        logInfo("Begin: Processing " + consumerRecords.count() + " changesets.");
        try {
            KBDataWriter kBDataWriter = new KBDataWriter(this._kb);
            try {
                EventWriter createWriter = StackedEventWriter.createWriter(0, kBDataWriter, this._rewriters);
                try {
                    Iterator it = consumerRecords.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        String stringTLMessageId = KafkaLogUtil.toStringTLMessageId(consumerRecord.headers());
                        long revision = ((ChangeSet) ((TLSyncRecord) consumerRecord.value()).getRecord()).getRevision();
                        ((TLSyncRecord) consumerRecord.value()).getSystemId();
                        LogUtil.withBeginEndLogging(KBDataProcessor.class, " Processing record" + stringTLMessageId + " with changeset " + revision + " from system " + stringTLMessageId + ".", () -> {
                            process(kBDataWriter, createWriter, consumerRecord);
                        });
                    }
                    logInfo("End: Processing.");
                    if (createWriter != null) {
                        createWriter.close();
                    }
                    kBDataWriter.close();
                } catch (Throwable th) {
                    if (createWriter != null) {
                        try {
                            createWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            logError("End with exception: Processing.", th3);
            throw th3;
        }
    }

    protected void process(KBDataWriter kBDataWriter, EventWriter eventWriter, ConsumerRecord<String, TLSyncRecord<ChangeSet>> consumerRecord) {
        ChangeSet changeSet = (ChangeSet) ((TLSyncRecord) consumerRecord.value()).getRecord();
        long systemId = ((TLSyncRecord) consumerRecord.value()).getSystemId();
        if (checkMessageOrder(systemId, ((TLSyncRecord) consumerRecord.value()).getLastMessageRevision(), changeSet)) {
            kBDataWriter.setRevisionKey(TLSyncUtils.getProcessedRevisionKey(systemId));
            eventWriter.write(changeSet);
        }
    }

    private boolean checkMessageOrder(long j, long j2, ChangeSet changeSet) {
        Long lastProcessedRevision = lastProcessedRevision(j);
        long revision = changeSet.getRevision();
        if (lastProcessedRevision == null) {
            return checkFirstMessage(j, j2, revision);
        }
        if (lastProcessedRevision.longValue() >= revision) {
            logInfo("Skipping old changeset " + revision + ". Last processed changeset: " + this);
            return false;
        }
        checkLastSentVsProcessedRevision(j, j2, lastProcessedRevision.longValue(), revision);
        return true;
    }

    private boolean checkFirstMessage(long j, long j2, long j3) {
        if (j2 == -1 || j2 == -2) {
            return true;
        }
        throw failStartOfCommunicationIsMissing(j, j2, j3);
    }

    private RuntimeException failStartOfCommunicationIsMissing(long j, long j2, long j3) {
        RuntimeException runtimeException = new RuntimeException("Detected that messages are missing, when receiving changeset " + j3 + " from system " + runtimeException + ". This is the first received message. But the message states that the last processed changeset should have been " + j + ".");
        throw runtimeException;
    }

    private void checkLastSentVsProcessedRevision(long j, long j2, long j3, long j4) {
        if (j2 == -2) {
            return;
        }
        if (j2 >= j4) {
            failLastSentRevisionTooHigh(j, j2, j4);
        }
        if (j2 > j3) {
            failMissingMessages(j, j2, j3, j4);
        }
        if (j2 < j3) {
            failLastSentRevisionTooLow(j, j2, Long.valueOf(j3), j4);
        }
    }

    private void failLastSentRevisionTooHigh(long j, long j2, long j3) {
        RuntimeException runtimeException = new RuntimeException("Received inconsistent message from system " + j + ". It contains changeset " + runtimeException + " but states that the last sent message before that was " + j3 + ".");
        throw runtimeException;
    }

    private void failMissingMessages(long j, long j2, long j3, long j4) {
        RuntimeException runtimeException = new RuntimeException("Detected that messages are missing, when receiving changeset " + j4 + " from system " + runtimeException + ". The last processed changeset was " + j + ". But the new message states that the last processed changeset should have been " + runtimeException + ".");
        throw runtimeException;
    }

    private void failLastSentRevisionTooLow(long j, long j2, Long l, long j3) {
        RuntimeException runtimeException = new RuntimeException("Detected an inconsistency when receiving changeset " + j3 + " from system " + runtimeException + ". The last processed changeset was " + j + ". But the new message states that the last processed changeset should have been " + runtimeException + ".");
        throw runtimeException;
    }

    private Long lastProcessedRevision(long j) {
        String property = this._dbProperties.getProperty(TLSyncUtils.getProcessedRevisionKey(j));
        if (StringServices.isEmpty(property)) {
            return null;
        }
        return Long.valueOf(Long.parseLong(property));
    }

    private void logError(String str, Throwable th) {
        Logger.error(str + " Cause: " + th.getMessage(), th, KBDataProcessor.class);
    }

    private void logInfo(String str) {
        Logger.info(str, KBDataProcessor.class);
    }
}
