package test.com.top_logic.kafka.services;

import com.top_logic.basic.config.ConfigurationException;
import com.top_logic.basic.config.InstantiationContext;
import com.top_logic.kafka.sync.knowledge.service.TLSyncRecord;
import com.top_logic.kafka.sync.knowledge.service.importer.KBDataProcessor;
import com.top_logic.kafka.sync.knowledge.service.importer.KBDataWriter;
import com.top_logic.knowledge.event.ChangeSet;
import com.top_logic.knowledge.event.EventWriter;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/* loaded from: input_file:test/com/top_logic/kafka/services/TestingKBDataProcessor.class */
public class TestingKBDataProcessor extends KBDataProcessor {
    private final Set<Long> _processedRevisions;

    public TestingKBDataProcessor(InstantiationContext instantiationContext, KBDataProcessor.Config config) throws ConfigurationException {
        super(instantiationContext, config);
        this._processedRevisions = new HashSet();
    }

    protected void process(KBDataWriter kBDataWriter, EventWriter eventWriter, ConsumerRecord<String, TLSyncRecord<ChangeSet>> consumerRecord) {
        super.process(kBDataWriter, eventWriter, consumerRecord);
        registerProcessedRevision(((ChangeSet) ((TLSyncRecord) consumerRecord.value()).getRecord()).getRevision());
    }

    private void registerProcessedRevision(long j) {
        synchronized (this._processedRevisions) {
            this._processedRevisions.add(Long.valueOf(j));
            this._processedRevisions.notifyAll();
        }
    }

    public Collection<Long> waitUntilAllProcessed(Iterable<Long> iterable, Duration duration) throws InterruptedException {
        long now = now() + duration.toMillis();
        Iterator<Long> it = iterable.iterator();
        synchronized (this._processedRevisions) {
            while (it.hasNext()) {
                Long next = it.next();
                while (!this._processedRevisions.contains(next)) {
                    long now2 = now();
                    if (now2 >= now) {
                        HashSet hashSet = new HashSet();
                        hashSet.add(next);
                        while (it.hasNext()) {
                            Long next2 = it.next();
                            if (!this._processedRevisions.contains(next2)) {
                                hashSet.add(next2);
                            }
                        }
                        return hashSet;
                    }
                    this._processedRevisions.wait(now - now2);
                }
            }
            return Collections.emptySet();
        }
    }

    private long now() {
        return System.currentTimeMillis();
    }
}
