package com.top_logic.kafka.sync.knowledge.service.exporter;

import com.top_logic.base.context.TLSubSessionContext;
import com.top_logic.basic.Logger;
import com.top_logic.basic.StringServices;
import com.top_logic.basic.config.InstantiationContext;
import com.top_logic.basic.config.PolymorphicConfiguration;
import com.top_logic.basic.config.TypedConfiguration;
import com.top_logic.basic.config.annotation.Format;
import com.top_logic.basic.config.annotation.Mandatory;
import com.top_logic.basic.config.annotation.Name;
import com.top_logic.basic.config.annotation.defaults.FloatDefault;
import com.top_logic.basic.config.annotation.defaults.FormattedDefault;
import com.top_logic.basic.config.annotation.defaults.LongDefault;
import com.top_logic.basic.config.format.MillisFormat;
import com.top_logic.basic.db.schema.properties.DBProperties;
import com.top_logic.basic.exception.I18NRuntimeException;
import com.top_logic.basic.logging.LogUtil;
import com.top_logic.basic.sql.ConnectionPool;
import com.top_logic.basic.sql.PooledConnection;
import com.top_logic.basic.util.ExponentialBackoff;
import com.top_logic.basic.util.ResKey;
import com.top_logic.basic.util.ResKey2;
import com.top_logic.basic.util.StopWatch;
import com.top_logic.kafka.services.producer.KafkaProducerService;
import com.top_logic.kafka.services.producer.TLKafkaProducer;
import com.top_logic.kafka.sync.knowledge.service.TLSyncRecord;
import com.top_logic.kafka.sync.knowledge.service.TLSyncUtils;
import com.top_logic.knowledge.event.ChangeSet;
import com.top_logic.knowledge.event.ChangeSetReader;
import com.top_logic.knowledge.event.EventWriter;
import com.top_logic.knowledge.event.convert.EventRewriter;
import com.top_logic.knowledge.event.convert.StackedEventWriter;
import com.top_logic.knowledge.service.ExtIDFactory;
import com.top_logic.knowledge.service.HistoryManager;
import com.top_logic.knowledge.service.HistoryUtils;
import com.top_logic.knowledge.service.KBUtils;
import com.top_logic.knowledge.service.KnowledgeBase;
import com.top_logic.knowledge.service.KnowledgeBaseFactory;
import com.top_logic.knowledge.service.KnowledgeBaseName;
import com.top_logic.knowledge.service.ReaderConfigBuilder;
import com.top_logic.knowledge.service.Revision;
import com.top_logic.knowledge.service.db2.DBKnowledgeBase;
import com.top_logic.knowledge.service.db2.UpdateChainLink;
import com.top_logic.util.TLContextManager;
import com.top_logic.util.sched.task.impl.StateHandlingTask;
import com.top_logic.util.sched.task.result.TaskResult;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/top_logic/kafka/sync/knowledge/service/exporter/KBDataProducerTask.class */
public class KBDataProducerTask extends StateHandlingTask<Config<?>> implements EventWriter {
    private static final char REVISION_DATE_SEPARATOR = '@';
    private static final String NODE = "__global__";
    private final TLKafkaProducer<String, TLSyncRecord<ChangeSet>> _producer;
    private final KnowledgeBase _kb;
    private final DBProperties _dbProperties;
    private final EventWriter _eventWriter;
    private final Queue<SentRecord> _lastSentEvents;
    private ExponentialBackoff _exponentialBackoff;
    private long _resumeTime;

    /* loaded from: input_file:com/top_logic/kafka/sync/knowledge/service/exporter/KBDataProducerTask$Config.class */
    public interface Config<I extends KBDataProducerTask> extends StateHandlingTask.Config<I>, KnowledgeBaseName {
        public static final String KAFKA_PRODUCER = "kafka-producer";
        public static final String REWRITERS = "rewriters";

        @Name(KAFKA_PRODUCER)
        @Mandatory
        String getKafkaProducer();

        @Name(REWRITERS)
        List<PolymorphicConfiguration<? extends EventRewriter>> getRewriters();

        @LongDefault(30000)
        long getLockTimeout();

        int getCachedEventSize();

        @FormattedDefault("1min")
        @Format(MillisFormat.class)
        long getErrorPauseStart();

        @FloatDefault(2.0f)
        float getErrorPauseFactor();

        @FormattedDefault("10min")
        @Format(MillisFormat.class)
        long getErrorPauseMax();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/top_logic/kafka/sync/knowledge/service/exporter/KBDataProducerTask$ProducerException.class */
    public static final class ProducerException extends I18NRuntimeException {
        public ProducerException(ResKey resKey, Throwable th) {
            super(resKey, th);
        }
    }

    /* loaded from: input_file:com/top_logic/kafka/sync/knowledge/service/exporter/KBDataProducerTask$SentRecord.class */
    public class SentRecord {
        private final ProducerRecord<String, TLSyncRecord<ChangeSet>> _producerRecord;
        private final Date _sentDate = new Date();

        SentRecord(ProducerRecord<String, TLSyncRecord<ChangeSet>> producerRecord) {
            this._producerRecord = producerRecord;
        }

        public ProducerRecord<String, TLSyncRecord<ChangeSet>> getRecord() {
            return this._producerRecord;
        }

        public Date getDate() {
            return this._sentDate;
        }

        public String toString() {
            return "Date: " + String.valueOf(this._sentDate) + ": " + String.valueOf(getRecord());
        }
    }

    public KBDataProducerTask(InstantiationContext instantiationContext, Config<?> config) {
        super(instantiationContext, config);
        this._resumeTime = Long.MIN_VALUE;
        this._producer = findProducer();
        if (this._producer == null) {
            instantiationContext.error("No " + TLKafkaProducer.class.getName() + " with name '" + config.getKafkaProducer() + "' known in the " + KafkaProducerService.class.getSimpleName());
        }
        int cachedEventSize = config.getCachedEventSize();
        if (cachedEventSize <= 0) {
            this._lastSentEvents = null;
        } else {
            this._lastSentEvents = new CircularFifoQueue(cachedEventSize);
        }
        this._kb = KnowledgeBaseFactory.getInstance().getKnowledgeBase(((Config) getConfig()).getKnowledgeBase());
        this._dbProperties = createDbProperties(this._kb);
        this._eventWriter = StackedEventWriter.createWriter(0, this, TypedConfiguration.getInstanceListReadOnly(instantiationContext, config.getRewriters()));
    }

    private TLKafkaProducer<String, TLSyncRecord<ChangeSet>> findProducer() {
        return KafkaProducerService.getInstance().getProducer(((Config) getConfig()).getKafkaProducer());
    }

    private DBProperties createDbProperties(KnowledgeBase knowledgeBase) {
        return new DBProperties(((DBKnowledgeBase) knowledgeBase).getConnectionPool());
    }

    protected void runHook() {
        LogUtil.withLogMark(TLSyncUtils.LOG_MARK_TL_SYNC, "true", this::runWithLogMark);
    }

    protected void runWithLogMark() {
        long now = now();
        if (now < this._resumeTime) {
            getLog().taskEnded(TaskResult.ResultType.WARNING, I18NConstants.SKIPPING_DUE_TO_EARLIER_PROBLEMS__RESUME_TIME.fill(new Date(this._resumeTime)));
            return;
        }
        HistoryManager historyManager = this._kb.getHistoryManager();
        UpdateChainLink current = this._kb.getUpdateChain().current();
        Map<String, String> revisionProperties = getRevisionProperties();
        String str = revisionProperties.get(TLSyncUtils.getLastSentRevisionAtDateLockKey());
        String str2 = revisionProperties.get(TLSyncUtils.getLastSentRevisionAtDateKey());
        Long checkLockAndGetLastSentRevision = checkLockAndGetLastSentRevision(now, current.getRevision(), str, str2);
        if (checkLockAndGetLastSentRevision == null) {
            return;
        }
        logDebugLockObtained();
        processRevisions(now, historyManager, current, str2, checkLockAndGetLastSentRevision);
    }

    private void logDebugLockObtained() {
        if (isLoggingDebug()) {
            logDebug("Lock obtained.");
        }
    }

    private Long checkLockAndGetLastSentRevision(long j, long j2, String str, String str2) {
        if (str == null) {
            if (compareAndSetLastRevisionLock(str, toString(j2, j))) {
                return 1L;
            }
            getLog().taskEnded(TaskResult.ResultType.SUCCESS, I18NConstants.SENDING_IN_PROGRESS);
            return null;
        }
        if (!str.equals(str2)) {
            long now = now() - getDate(str);
            if (now <= ((Config) getConfig()).getLockTimeout()) {
                getLog().taskEnded(TaskResult.ResultType.SUCCESS, I18NConstants.SENDING_IN_PROGRESS);
                return null;
            }
            compareAndSetLastRevisionLock(str, str2);
            getLog().taskEnded(TaskResult.ResultType.ERROR, I18NConstants.LOCK_TIMED_OUT__TIME.fill(StopWatch.toStringMillis(now)));
            return null;
        }
        long revision = getRevision(str2);
        if (revision == j2) {
            getLog().taskEnded(TaskResult.ResultType.SUCCESS, I18NConstants.NO_CHANGES);
            return null;
        }
        if (j2 < revision) {
            getLog().taskEnded(TaskResult.ResultType.SUCCESS, I18NConstants.SENDING_IN_PROGRESS);
            return null;
        }
        if (compareAndSetLastRevisionLock(str, toString(j2, j))) {
            return Long.valueOf(revision);
        }
        getLog().taskEnded(TaskResult.ResultType.SUCCESS, I18NConstants.SENDING_IN_PROGRESS);
        return null;
    }

    private void processRevisions(long j, HistoryManager historyManager, UpdateChainLink updateChainLink, String str, Long l) {
        boolean z = false;
        try {
            try {
                Revision revision = historyManager.getRevision(l.longValue() + 1);
                Revision revision2 = historyManager.getRevision(updateChainLink.getRevision());
                logDebugBeginSending(revision, revision2);
                TLSubSessionContext subSession = TLContextManager.getSubSession();
                UpdateChainLink createUpdateChain = createUpdateChain(revision, updateChainLink);
                try {
                    try {
                        ChangeSetReader changeSetReader = this._kb.getChangeSetReader(ReaderConfigBuilder.createConfig(revision, revision2));
                        try {
                            if (getShouldStop()) {
                                getLog().taskEnded(TaskResult.ResultType.CANCELED, I18NConstants.CANCELED_DURING_STARTUP);
                                if (changeSetReader != null) {
                                    changeSetReader.close();
                                }
                                setLastRevisionLock(str);
                                return;
                            }
                            while (true) {
                                logDebugReadingNextRevision();
                                ChangeSet read = changeSetReader.read();
                                if (read == null) {
                                    logDebugFinishedSending(revision, revision2, createUpdateChain.getRevision());
                                    logDebugFinishedSendingNormally();
                                    if (changeSetReader != null) {
                                        changeSetReader.close();
                                    }
                                    historyManager.updateSessionRevision();
                                    logTaskEndedSuccessfully(revision, revision2);
                                    setLastRevisionLock(str);
                                    return;
                                }
                                logDebugReadNextChangeset(read);
                                while (createUpdateChain.getRevision() != read.getRevision()) {
                                    createUpdateChain = createUpdateChain.getNextUpdate();
                                    logDebugMovingToNextRevision(createUpdateChain, read);
                                }
                                HistoryUtils.updateSessionAndInteractionRevision(historyManager, subSession, createUpdateChain);
                                try {
                                    this._eventWriter.write(read);
                                    logInfoFinishedSendingRevision(read.getRevision());
                                    long now = now();
                                    if (now - j > (((Config) getConfig()).getLockTimeout() * 2) / 3) {
                                        setLastRevisionLock(toString(updateChainLink.getRevision(), now));
                                        j = now;
                                    }
                                    str = toString(read.getRevision(), now);
                                    setLastRevision(str);
                                    z = true;
                                    logDebugStoreRevisionHasBeenSent(read.getRevision());
                                    if (getShouldStop()) {
                                        getLog().taskEnded(TaskResult.ResultType.CANCELED, getMessageTaskCanceled(revision, read));
                                        if (changeSetReader != null) {
                                            changeSetReader.close();
                                        }
                                        historyManager.updateSessionRevision();
                                        setLastRevisionLock(str);
                                        return;
                                    }
                                    resetExponentialBackoff();
                                } catch (ProducerException e) {
                                    logTaskEndedWriteFailed(z, createUpdateChain.getRevision(), e);
                                    this._resumeTime = System.currentTimeMillis() + calcErrorPause();
                                    if (changeSetReader != null) {
                                        changeSetReader.close();
                                    }
                                    historyManager.updateSessionRevision();
                                    setLastRevisionLock(str);
                                    return;
                                }
                            }
                        } catch (Throwable th) {
                            if (changeSetReader != null) {
                                try {
                                    changeSetReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } finally {
                        historyManager.updateSessionRevision();
                    }
                } catch (Throwable th3) {
                    logErrorProcessingRevisionsFailed(createUpdateChain.getRevision(), th3);
                    throw th3;
                }
            } catch (Throwable th4) {
                logErrorSendingFailed(th4);
                this._resumeTime = System.currentTimeMillis() + calcErrorPause();
                getLog().taskEnded(TaskResult.ResultType.ERROR, getMessageUnhandledException(false), th4);
                setLastRevisionLock(str);
            }
        } catch (Throwable th5) {
            setLastRevisionLock(str);
            throw th5;
        }
    }

    private void logDebugBeginSending(Revision revision, Revision revision2) {
        if (isLoggingDebug()) {
            long commitNumber = revision.getCommitNumber();
            revision2.getCommitNumber();
            logDebug("Begin sending. First revision: " + commitNumber + ". Last revision " + commitNumber);
        }
    }

    private void logDebugReadingNextRevision() {
        if (isLoggingDebug()) {
            logDebug("Reading next revision.");
        }
    }

    private void logDebugFinishedSending(Revision revision, Revision revision2, long j) {
        if (isLoggingDebug()) {
            long commitNumber = revision.getCommitNumber();
            logDebug("Finished sending. No more revisions. Start was: " + commitNumber + ". Planned end was: " + commitNumber + ". Actual end was: " + revision2.getCommitNumber());
        }
    }

    private void logDebugReadNextChangeset(ChangeSet changeSet) {
        if (isLoggingDebug()) {
            long revision = changeSet.getRevision();
            long date = changeSet.getCommit().getDate();
            String valueOf = String.valueOf(changeSet.getCommit().getKind());
            String author = changeSet.getCommit().getAuthor();
            int size = changeSet.getCreations().size();
            int size2 = changeSet.getUpdates().size();
            int size3 = changeSet.getDeletions().size();
            changeSet.getBranchEvents().size();
            changeSet.getCommit().getLog();
            logDebug("Read next changeset. Revision: " + revision + ". Date: " + revision + ". Kind: " + date + ". Author: " + revision + ". Creates: " + valueOf + ". Updates: " + author + ". Deletes: " + size + ". Branch events: " + size2 + ". Log: " + size3);
        }
    }

    private void logDebugMovingToNextRevision(UpdateChainLink updateChainLink, ChangeSet changeSet) {
        if (isLoggingDebug()) {
            long revision = updateChainLink.getRevision();
            changeSet.getRevision();
            logDebug("Moving forward to next revision: " + revision + ". Next change: " + revision);
        }
    }

    private void logInfoFinishedSendingRevision(long j) {
        logInfo("Finished sending revision: " + j);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void logTaskEndedWriteFailed(boolean z, long j, ProducerException producerException) {
        getLog().taskEnded(TaskResult.ResultType.ERROR, getMessageProducerException(z).fill(Long.valueOf(j), producerException.getErrorKey()), producerException);
    }

    private void logDebugStoreRevisionHasBeenSent(long j) {
        if (isLoggingDebug()) {
            logDebug("Stored that revision " + j + " has been sent.");
        }
    }

    private void logDebugFinishedSendingNormally() {
        if (isLoggingDebug()) {
            logDebug("Finished sending normally.");
        }
    }

    private void logErrorProcessingRevisionsFailed(long j, Throwable th) {
        th.getMessage();
        logError("Processing the revisions failed due to an unexpected exception. Revision: " + j + ". Exception: " + j, th);
    }

    private void logTaskEndedSuccessfully(Revision revision, Revision revision2) {
        getLog().taskEnded(TaskResult.ResultType.SUCCESS, I18NConstants.SEND_TO_KAFKA_SUCCESSFUL__START_REVISION__STOP_REVISION.fill(Long.valueOf(revision.getCommitNumber()), Long.valueOf(revision2.getCommitNumber())));
    }

    private void logErrorSendingFailed(Throwable th) {
        logError("Sending failed due to an unexpected exception: " + th.getMessage(), th);
    }

    private long getDate(String str) {
        return Long.parseLong(str.substring(str.indexOf(REVISION_DATE_SEPARATOR) + 1));
    }

    private long getRevision(String str) {
        return Long.parseLong(str.substring(0, str.indexOf(REVISION_DATE_SEPARATOR)));
    }

    private boolean compareAndSetLastRevisionLock(String str, String str2) {
        ConnectionPool connectionPool = KBUtils.getConnectionPool(this._kb);
        PooledConnection borrowWriteConnection = connectionPool.borrowWriteConnection();
        try {
            try {
                boolean compareAndSet = DBProperties.compareAndSet(borrowWriteConnection, NODE, TLSyncUtils.getLastSentRevisionAtDateLockKey(), str, str2);
                borrowWriteConnection.commit();
                releaseWriteConnection(connectionPool, borrowWriteConnection);
                return compareAndSet;
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            releaseWriteConnection(connectionPool, borrowWriteConnection);
            throw th;
        }
    }

    private UpdateChainLink createUpdateChain(Revision revision, UpdateChainLink updateChainLink) {
        UpdateChainLink updateChainLink2 = updateChainLink;
        long revision2 = updateChainLink.getRevision();
        while (true) {
            long j = revision2 - 1;
            if (j < revision.getCommitNumber()) {
                return updateChainLink2;
            }
            UpdateChainLink updateChainLink3 = new UpdateChainLink(j);
            updateChainLink3.setNextUpdate(updateChainLink2);
            updateChainLink2 = updateChainLink3;
            revision2 = j;
        }
    }

    private ResKey2 getMessageProducerException(boolean z) {
        return z ? I18NConstants.WRITE_FAILED_BUT_PROGRESS__REVISION__EXCEPTION : I18NConstants.WRITE_FAILED_NO_PROGRESS__REVISION__EXCEPTION;
    }

    private ResKey getMessageTaskCanceled(Revision revision, ChangeSet changeSet) {
        return I18NConstants.CANCELED_DURING_PROCESSING__START_REVISION__LAST_PROCESSED.fill(Long.valueOf(revision.getCommitNumber()), Long.valueOf(changeSet.getRevision()));
    }

    private ResKey getMessageUnhandledException(boolean z) {
        return z ? I18NConstants.SEND_FAILED_UNHANDLED_EXCEPTION_BUT_PROGRESS : I18NConstants.SEND_FAILED_UNHANDLED_EXCEPTION_NO_PROGRESS;
    }

    private void setLastRevisionLock(String str) {
        setProperty(TLSyncUtils.getLastSentRevisionAtDateLockKey(), str);
    }

    private void setLastRevision(String str) {
        setProperty(TLSyncUtils.getLastSentRevisionAtDateKey(), str);
    }

    private String toString(long j, long j2) {
        return Long.toString(j) + "@" + Long.valueOf(j2);
    }

    private void setProperty(String str, String str2) {
        ConnectionPool connectionPool = KBUtils.getConnectionPool(this._kb);
        PooledConnection borrowWriteConnection = connectionPool.borrowWriteConnection();
        try {
            try {
                DBProperties.setProperty(borrowWriteConnection, NODE, str, str2);
                borrowWriteConnection.commit();
                releaseWriteConnection(connectionPool, borrowWriteConnection);
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            releaseWriteConnection(connectionPool, borrowWriteConnection);
            throw th;
        }
    }

    private void releaseWriteConnection(ConnectionPool connectionPool, PooledConnection pooledConnection) {
        try {
            connectionPool.releaseWriteConnection(pooledConnection);
        } catch (RuntimeException e) {
            Logger.error("Failed to release write connection. Cause: " + e.getMessage(), e, KBDataProducerTask.class);
        }
    }

    private Map<String, String> getRevisionProperties() {
        ConnectionPool connectionPool = KBUtils.getConnectionPool(this._kb);
        PooledConnection borrowReadConnection = connectionPool.borrowReadConnection();
        try {
            try {
                Map<String, String> properties = DBProperties.getProperties(borrowReadConnection, NODE, new String[]{TLSyncUtils.getLastSentRevisionAtDateKey(), TLSyncUtils.getLastSentRevisionAtDateLockKey()});
                releaseReadConnection(connectionPool, borrowReadConnection);
                return properties;
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            releaseReadConnection(connectionPool, borrowReadConnection);
            throw th;
        }
    }

    private void releaseReadConnection(ConnectionPool connectionPool, PooledConnection pooledConnection) {
        try {
            connectionPool.releaseReadConnection(pooledConnection);
        } catch (RuntimeException e) {
            Logger.error("Failed to release write connection. Cause: " + e.getMessage(), e, KBDataProducerTask.class);
        }
    }

    private long calcErrorPause() {
        if (this._exponentialBackoff == null) {
            this._exponentialBackoff = createExponentialBackoff();
        }
        return Math.round(this._exponentialBackoff.next().doubleValue());
    }

    private ExponentialBackoff createExponentialBackoff() {
        return new ExponentialBackoff(((Config) getConfig()).getErrorPauseStart(), ((Config) getConfig()).getErrorPauseFactor(), ((Config) getConfig()).getErrorPauseMax());
    }

    private void resetExponentialBackoff() {
        this._exponentialBackoff = null;
    }

    public void write(ChangeSet changeSet) {
        checkChangeSetNotEmpty(changeSet);
        ProducerRecord<String, TLSyncRecord<ChangeSet>> createProducerRecord = createProducerRecord(createTLSyncRecord(changeSet));
        send(createProducerRecord);
        setLastMessageRevision(changeSet.getRevision());
        offerSentEvent(createProducerRecord);
    }

    private void send(ProducerRecord<String, TLSyncRecord<ChangeSet>> producerRecord) {
        try {
            this._producer.send(producerRecord).get(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new ProducerException(I18NConstants.ERROR_SEND_TO_KAFKA_INTERRUPTED, e);
        } catch (ExecutionException e2) {
            throw new ProducerException(I18NConstants.ERROR_SEND_TO_KAFKA_EXCEPTION, e2);
        } catch (TimeoutException e3) {
            throw new ProducerException(I18NConstants.ERROR_SEND_TO_KAFKA_TIMEOUT, e3);
        }
    }

    private TLSyncRecord<ChangeSet> createTLSyncRecord(ChangeSet changeSet) {
        return new TLSyncRecord<>(ExtIDFactory.getInstance().getSystemId(), getLastMessageRevision(), changeSet);
    }

    private ProducerRecord<String, TLSyncRecord<ChangeSet>> createProducerRecord(TLSyncRecord<ChangeSet> tLSyncRecord) {
        return new ProducerRecord<>(this._producer.getTopic(), tLSyncRecord);
    }

    private void checkChangeSetNotEmpty(ChangeSet changeSet) {
        if (changeSet.getCreations().isEmpty() && changeSet.getUpdates().isEmpty() && changeSet.getDeletions().isEmpty() && changeSet.getBranchEvents().isEmpty()) {
            logWarning("Sending seemingly empty changeset: " + String.valueOf(changeSet));
        }
    }

    private long getLastMessageRevision() {
        String property = this._dbProperties.getProperty(TLSyncUtils.getLastMessageRevisionKey());
        if (StringServices.isEmpty(property)) {
            return -1L;
        }
        return Long.parseLong(property);
    }

    private void setLastMessageRevision(long j) {
        setProperty(TLSyncUtils.getLastMessageRevisionKey(), Long.toString(j));
    }

    private void offerSentEvent(ProducerRecord<String, TLSyncRecord<ChangeSet>> producerRecord) {
        if (this._lastSentEvents != null) {
            synchronized (this._lastSentEvents) {
                this._lastSentEvents.add(new SentRecord(producerRecord));
            }
        }
    }

    public void flush() {
    }

    public void close() {
    }

    private static boolean isLoggingDebug() {
        return Logger.isDebugEnabled(KBDataProducerTask.class);
    }

    private static void logDebug(String str) {
        Logger.debug(str, KBDataProducerTask.class);
    }

    private static void logInfo(String str) {
        Logger.info(str, KBDataProducerTask.class);
    }

    private static void logWarning(String str) {
        Logger.warn(str, KBDataProducerTask.class);
    }

    private static void logError(String str, Throwable th) {
        Logger.error(str + " Cause: " + th.getMessage(), th, KBDataProducerTask.class);
    }

    public List<SentRecord> getSentRecords() {
        List<SentRecord> asList;
        if (this._lastSentEvents == null) {
            return Collections.emptyList();
        }
        synchronized (this._lastSentEvents) {
            asList = Arrays.asList((SentRecord[]) this._lastSentEvents.toArray(new SentRecord[this._lastSentEvents.size()]));
        }
        return asList;
    }
}
