package test.com.top_logic.kafka.services;

import com.top_logic.basic.module.BasicRuntimeModule;
import com.top_logic.dob.ex.UnknownTypeException;
import com.top_logic.dob.meta.MOClass;
import com.top_logic.kafka.services.consumer.ConsumerDispatcher;
import com.top_logic.kafka.services.consumer.KafkaConsumerService;
import com.top_logic.kafka.services.producer.KafkaProducerService;
import com.top_logic.kafka.sync.knowledge.service.TLSyncRecord;
import com.top_logic.kafka.sync.knowledge.service.exporter.KBDataProducerTask;
import com.top_logic.knowledge.event.ChangeSet;
import com.top_logic.knowledge.objects.KnowledgeItem;
import com.top_logic.knowledge.objects.identifier.ExtReference;
import com.top_logic.knowledge.objects.identifier.ExtReferenceFormat;
import com.top_logic.knowledge.service.ExtIDFactory;
import com.top_logic.knowledge.service.HistoryUtils;
import com.top_logic.knowledge.service.KBUtils;
import com.top_logic.knowledge.service.KnowledgeBaseRuntimeException;
import com.top_logic.model.TLObject;
import com.top_logic.model.TLType;
import com.top_logic.model.annotate.util.TLAnnotations;
import com.top_logic.util.model.ModelService;
import com.top_logic.util.sched.Scheduler;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import junit.framework.AssertionFailedError;
import junit.framework.Test;
import test.com.top_logic.basic.AssertNoErrorLogListener;
import test.com.top_logic.basic.BasicTestCase;
import test.com.top_logic.basic.DefaultTestFactory;
import test.com.top_logic.basic.ReflectionUtils;
import test.com.top_logic.basic.TestFactory;
import test.com.top_logic.basic.module.ServiceTestSetup;
import test.com.top_logic.kafka.KafkaTestSetup;
import test.com.top_logic.knowledge.KBSetup;
import test.com.top_logic.model.AbstractTLModelTest;

/* loaded from: input_file:test/com/top_logic/kafka/services/AbstractTLSyncTest.class */
public abstract class AbstractTLSyncTest extends AbstractTLModelTest {
    private static final Duration WAIT_TIMEOUT = Duration.ofMillis(10000);
    public static final String KB_DATA_CHANGE_TOPIC = "KB-DATA-CHANGE";
    public static final String KB_CHANGE_CONSUMER = "KB-Data-Change-Consumer";

    public TLObject findReceivedObjectFor(TLType tLType, TLObject tLObject) {
        return getTLObjectByExternalId(tLType, ExtIDFactory.getInstance().newExtReference(tLObject));
    }

    private TLObject getTLObjectByExternalId(TLType tLType, ExtReference extReference) {
        Iterator<KnowledgeItem> kOByExternalId = getKOByExternalId(tLType, extReference);
        if (!kOByExternalId.hasNext()) {
            return null;
        }
        KnowledgeItem next = kOByExternalId.next();
        if (kOByExternalId.hasNext()) {
            throw new AssertionFailedError("Found multiple items with same external ID: " + String.valueOf(next) + ", " + String.valueOf(toList(kOByExternalId)));
        }
        return next.getWrapper();
    }

    private Iterator<KnowledgeItem> getKOByExternalId(TLType tLType, ExtReference extReference) {
        String table = TLAnnotations.getTable(tLType);
        return getKOByExternalId(table, extReference, ExtIDFactory.getInstance().getExternalIDAttribute(getMOClass(table)));
    }

    private Iterator<KnowledgeItem> getKOByExternalId(String str, ExtReference extReference, String str2) {
        try {
            return kb().getObjectsByAttribute(str, str2, ExtReferenceFormat.INSTANCE.format(extReference));
        } catch (UnknownTypeException e) {
            throw new KnowledgeBaseRuntimeException(e);
        }
    }

    private MOClass getMOClass(String str) {
        try {
            return kb().getMORepository().getMetaObject(str);
        } catch (UnknownTypeException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int triggerKafka() throws Exception {
        return runWithKafka(() -> {
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sync(Runnable runnable) {
        sync(false, runnable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sync(boolean z, Runnable runnable) {
        try {
            runWithKafka(z, () -> {
                KBUtils.inTransaction(() -> {
                    runnable.run();
                });
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> T sync(Supplier<T> supplier) {
        return (T) sync(false, (Supplier) supplier);
    }

    protected <T> T sync(boolean z, Supplier<T> supplier) {
        try {
            AtomicReference atomicReference = new AtomicReference();
            runWithKafka(z, () -> {
                KBUtils.inTransaction(() -> {
                    atomicReference.set(supplier.get());
                });
            });
            return (T) atomicReference.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int runWithKafka(BasicTestCase.Execution execution) throws Exception {
        return runWithKafka(false, execution);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v49, types: [java.util.Set] */
    protected int runWithKafka(boolean z, BasicTestCase.Execution execution) throws Exception {
        HashSet hashSet;
        int size;
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        execution.run();
        AssertNoErrorLogListener assertNoErrorLogListener = z ? null : new AssertNoErrorLogListener();
        if (assertNoErrorLogListener != null) {
            assertNoErrorLogListener.activate();
        }
        try {
            TestingKBDataProcessor consumerProcessor = consumerProcessor();
            KBDataProducerTask producerTask = producerTask();
            Date date = new Date();
            producerTask.run();
            List sentRecords = producerTask.getSentRecords();
            if (sentRecords.isEmpty()) {
                size = 0;
                hashSet = Collections.emptySet();
            } else {
                hashSet = new HashSet();
                int size2 = sentRecords.size() - 1;
                while (size2 >= 0) {
                    KBDataProducerTask.SentRecord sentRecord = (KBDataProducerTask.SentRecord) sentRecords.get(size2);
                    if (sentRecord.getDate().before(date)) {
                        break;
                    }
                    hashSet.add(Long.valueOf(((ChangeSet) ((TLSyncRecord) sentRecord.getRecord().value()).getRecord()).getRevision()));
                    size2--;
                }
                size = (sentRecords.size() - size2) - 1;
            }
            Collection<Long> waitUntilAllProcessed = consumerProcessor.waitUntilAllProcessed(hashSet, WAIT_TIMEOUT);
            if (!waitUntilAllProcessed.isEmpty()) {
                throw new AssertionFailedError("Unprocessed Revisions: " + String.valueOf(waitUntilAllProcessed));
            }
            HistoryUtils.updateSessionRevision(kb().getHistoryManager());
            return size;
        } finally {
            if (assertNoErrorLogListener != null) {
                assertNoErrorLogListener.deactivate();
            }
        }
    }

    private TestingKBDataProcessor consumerProcessor() {
        ConsumerDispatcher consumer = KafkaConsumerService.getInstance().getConsumer(KB_CHANGE_CONSUMER);
        if (consumer == null) {
            throw new IllegalStateException("No consumer with name KB-Data-Change-Consumer found.");
        }
        TestingKBDataProcessor testingKBDataProcessor = null;
        for (Object obj : (List) ReflectionUtils.getValue(consumer, "_processors", List.class)) {
            if (obj instanceof TestingKBDataProcessor) {
                if (testingKBDataProcessor != null) {
                    throw new IllegalStateException("Multiple instances of TestingKBDataProcessor configured: " + String.valueOf(obj) + " vs. " + String.valueOf(testingKBDataProcessor));
                }
                testingKBDataProcessor = (TestingKBDataProcessor) obj;
            }
        }
        return testingKBDataProcessor;
    }

    private KBDataProducerTask producerTask() {
        KBDataProducerTask taskByName = Scheduler.getSchedulerInstance().getTaskByName("KafkaProducerTask");
        if (taskByName == null) {
            throw new IllegalStateException("No task with name " + "KafkaProducerTask" + " available.");
        }
        return taskByName;
    }

    public static Test suite(Class<?> cls) {
        return suite(cls, DefaultTestFactory.INSTANCE);
    }

    public static Test suite(Class<?> cls, TestFactory testFactory) {
        return KafkaTestSetup.suite((Test) KBSetup.getKBTestWithoutSetups(cls, ServiceTestSetup.createStarterFactory(Scheduler.Module.INSTANCE, ServiceTestSetup.createStarterFactory(ModelService.Module.INSTANCE, ServiceTestSetup.createStarterFactoryForModules(testFactory, new BasicRuntimeModule[]{KafkaProducerService.Module.INSTANCE, KafkaConsumerService.Module.INSTANCE}))), true));
    }
}
