package nl.wldelft.fews.common.sql;

import java.sql.SQLException;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import nl.wldelft.fews.common.synchronization.DeletedRowsTable;
import nl.wldelft.fews.common.util.RowIdSet;
import nl.wldelft.sql.ExtendedDataSource;
import nl.wldelft.sql.SQLForeignKeyConstraintViolationException;
import nl.wldelft.util.CollectionUtils;
import nl.wldelft.util.DateUtils;
import nl.wldelft.util.Disposable;
import nl.wldelft.util.Interruption;
import nl.wldelft.util.Listener;
import nl.wldelft.util.Listeners;
import nl.wldelft.util.ListenersFactory;
import nl.wldelft.util.ObjectArrayUtils;
import nl.wldelft.util.ThreadUtils;
import nl.wldelft.util.UnmodifiableList;
import org.apache.log4j.Logger;

/* loaded from: input_file:nl/wldelft/fews/common/sql/Synchronizer.class */
public class Synchronizer implements Disposable {
    private static final Logger log = Logger.getLogger(Synchronizer.class);
    private static final ThreadGroup THREAD_GROUP = new ThreadGroup("_synchronizer");
    private final ExtendedDataSource srcDataSource;
    private final SynchProfile profile;
    private final UpsertExecutor upsertExecutor;
    private final RowIdSet synchDisabledRows;
    private final ChangeDetector[] changeDetectors;
    private final Channel[] channels;
    private final boolean[] wasChannelPending;
    private final Queue<TaskRunId> taskRunQueue = new ConcurrentLinkedDeque();
    private final Set<TaskRunId> pendingTaskRuns = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ExecutorService executorService = new ThreadPoolExecutor(4, 4, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), ThreadUtils.createThreadFactory(THREAD_GROUP, 4, "_synchronization_"));
    private final Thread thread = new Thread(THREAD_GROUP, this::run, "_ detect_changed_and_update");
    private volatile boolean started = false;
    private boolean initialized = false;
    private String srcMcId = null;
    private boolean disposed = false;
    private volatile boolean synchronizing = false;
    private final Semaphore sleepSemaphore = new Semaphore(0);
    private final ListenersFactory listenersFactory = new ListenersFactory();
    private final Listeners<Channel> channelStartedListeners = this.listenersFactory.get();
    private final Listeners<Channel> channelFinishedListeners = this.listenersFactory.get();
    private final Listeners<Channel> chunkFinishedListener = this.listenersFactory.get();
    private final Listeners<Synchronizer> synchFinishedListener = this.listenersFactory.get();
    private final Listeners<TaskRunId> taskRunFinishedListener = this.listenersFactory.get();
    private long lastSuccessfulSynchronizationStartTime = Long.MIN_VALUE;
    private long lastSynchronizationHighPriorityStartTime = Long.MIN_VALUE;
    private final RowIdSet srcRowIds = new RowIdSet();
    private final RowIdsWithModifications srcRowIdsWithModifications = new RowIdsWithModifications();
    private final RowIdsWithModifications dstRowIdsWithModifications = new RowIdsWithModifications();
    private volatile boolean onlySynchronizeConfiguration = false;
    private volatile boolean initialSynchFinished = false;
    private boolean anyPreviousChannelPending = false;
    private boolean anyPreviousConfigChannelPending = false;
    private final AtomicBoolean startValidation = new AtomicBoolean(false);

    public Synchronizer(ExtendedDataSource extendedDataSource, SynchProfile synchProfile, UpsertExecutor upsertExecutor, RowIdSet rowIdSet) {
        this.srcDataSource = extendedDataSource;
        this.upsertExecutor = upsertExecutor;
        this.profile = synchProfile;
        this.synchDisabledRows = rowIdSet;
        this.changeDetectors = ChangeDetector.clasz.newArrayFromMapped((UnmodifiableList) synchProfile, this::createChangeDetector);
        this.channels = Channel.clasz.newArrayFromMapped((UnmodifiableList) synchProfile, this::createChannel);
        this.wasChannelPending = new boolean[synchProfile.size()];
    }

    @Deprecated
    public Thread getThread() {
        return this.thread;
    }

    public void setOnlySynchronizeConfiguration(boolean z) {
        this.onlySynchronizeConfiguration = z;
    }

    private ChangeDetector createChangeDetector(ChannelFilter channelFilter) {
        return new ChangeDetector(this.srcDataSource, this.upsertExecutor.getDstDataSource(), channelFilter, this.srcRowIds, this.srcRowIdsWithModifications, this.dstRowIdsWithModifications);
    }

    private Channel createChannel(ChannelFilter channelFilter) {
        return new Channel(this.srcDataSource, this.upsertExecutor, this.synchDisabledRows, this.executorService, false, channelFilter, this.channelStartedListeners, this.channelFinishedListeners, this.chunkFinishedListener);
    }

    public void start() {
        this.started = true;
        this.thread.start();
    }

    public boolean isStarted() {
        return this.started;
    }

    public void queueTaskRun(TaskRunId taskRunId) {
        if (this.pendingTaskRuns.add(taskRunId)) {
            this.taskRunQueue.add(taskRunId);
            this.sleepSemaphore.release();
        }
    }

    public void startValidation() {
        this.startValidation.set(true);
        this.sleepSemaphore.release();
    }

    public boolean isPending(TaskRunId taskRunId) {
        return this.pendingTaskRuns.contains(taskRunId);
    }

    private void run() {
        while (true) {
            try {
                FewsSqlUtils.waitWhileMaintenanceMode(this.srcDataSource, this.upsertExecutor.getDstDataSource());
                FewsSqlUtils.waitWhileSwitchedOff(this.srcDataSource, this.upsertExecutor.getDstDataSource());
                FewsSqlUtils.waitWhileFailover(this.srcDataSource, this.upsertExecutor.getDstDataSource());
                validateGlobalRowIdRegeneration();
                init();
                if (this.startValidation.getAndSet(false)) {
                    validate();
                } else if (this.taskRunQueue.isEmpty()) {
                    synchronizeAll(null);
                } else {
                    CollectionUtils.pollAll(this.taskRunQueue, taskRunId -> {
                        boolean z = false;
                        try {
                            taskRunId.forEachDecorated(this::synchronizeAll);
                            z = true;
                            this.pendingTaskRuns.remove(taskRunId);
                            this.taskRunFinishedListener.fire(taskRunId);
                            if (1 == 0) {
                                this.taskRunQueue.add(taskRunId);
                            }
                        } catch (Throwable th) {
                            if (!z) {
                                this.taskRunQueue.add(taskRunId);
                            }
                            throw th;
                        }
                    });
                }
                if (!this.synchronizing) {
                    ThreadUtils.tryAcquire(this.sleepSemaphore, 3000L);
                }
                ObjectArrayUtils.forEach(this.channels, channel -> {
                    channel.getStatistics().saveWhenNeeded();
                });
            } catch (Error e) {
                log.error(e.getMessage(), e);
                return;
            } catch (Exception e2) {
                log.error(e2.getMessage(), e2);
                ThreadUtils.sleep(60000L);
            } catch (MaintenanceModeDetected | SynchSwitchedOff e3) {
                this.initialized = false;
            } catch (Interruption e4) {
                return;
            } catch (SQLForeignKeyConstraintViolationException e5) {
                log.error("Foreign key error. Search for missed rows\n" + e5.getMessage());
                resetParentTablesLastDetectTimes();
            }
        }
    }

    private void validate() throws Exception {
        log.info("Start validation synchronization channels");
        ObjectArrayUtils.forEach(this.changeDetectors, changeDetector -> {
            changeDetector.validate(this.upsertExecutor.getSynchDisabledRows());
        });
        log.info("End validation synchronization channels");
    }

    private void resetParentTablesLastDetectTimes() {
        ObjectArrayUtils.forEachWhere(this.channels, channel -> {
            return channel.getFilter().getMaxAge() == Long.MAX_VALUE;
        }, channel2 -> {
            this.changeDetectors[channel2.getFilter().getIndex()].resetLastDetectTime();
            channel2.clearPending();
        });
    }

    private void validateGlobalRowIdRegeneration() throws SQLException {
        long globalRowIdRegenerationTime = FewsSqlUtils.getGlobalRowIdRegenerationTime(this.srcDataSource);
        long globalRowIdRegenerationTime2 = FewsSqlUtils.getGlobalRowIdRegenerationTime(this.upsertExecutor.getDstDataSource());
        if (globalRowIdRegenerationTime == globalRowIdRegenerationTime2) {
            return;
        }
        FewsSqlUtils.ensureNoMaintenanceMode(this.srcDataSource);
        FewsSqlUtils.ensureSynchSwitchedOn(this.srcDataSource, this.upsertExecutor.getDstDataSource());
        throw new Error("Synchronization stopped due global row id regeneration time mismatch between both databases, src=" + this.srcDataSource + ": " + DateUtils.toString(globalRowIdRegenerationTime) + " millis: " + DateUtils.getMillisPart(globalRowIdRegenerationTime) + ", dst=" + this.upsertExecutor.getDstDataSource() + ": " + DateUtils.toString(globalRowIdRegenerationTime2) + " millis: " + DateUtils.getMillisPart(globalRowIdRegenerationTime2) + ", diff=" + DateUtils.formatRelativeMillis(globalRowIdRegenerationTime2 - globalRowIdRegenerationTime));
    }

    private void init() throws Exception {
        if (this.initialized) {
            return;
        }
        this.srcMcId = FewsSqlUtils.getMcId(this.srcDataSource);
        log.info("Read unprocessed deleted rows on server");
        RowIdSet readUnprocessedDeletedRows = DeletedRowsTable.readUnprocessedDeletedRows(this.srcDataSource);
        synchronized (this.synchDisabledRows) {
            this.synchDisabledRows.addAll(readUnprocessedDeletedRows);
        }
        ObjectArrayUtils.forEach(this.channels, channel -> {
            channel.init(this.srcMcId);
            if (this.upsertExecutor.getDstDataSource().isEmbedded()) {
                this.changeDetectors[channel.getFilter().getIndex()].setLastDetectTime(channel.getStatistics().getLastSuccessfulSynchStartTime());
            }
        });
        this.initialized = true;
        this.synchronizing = true;
    }

    private void synchronizeAll(String str) throws Exception {
        long currentServerTime = this.srcDataSource.getCurrentServerTime();
        boolean z = this.synchronizing;
        this.srcDataSource.refreshTableModificationTimesNow();
        ObjectArrayUtils.forEachReverse(this.channels, channel -> {
            detectChanges(channel, str);
        });
        if (!ObjectArrayUtils.anyMatch(this.channels, (v0) -> {
            return v0.isPending();
        })) {
            this.synchronizing = false;
            if (z) {
                this.synchFinishedListener.fire(this);
            }
            if (this.onlySynchronizeConfiguration || str != null) {
                return;
            }
            this.initialSynchFinished = true;
            return;
        }
        this.anyPreviousChannelPending = false;
        this.anyPreviousConfigChannelPending = false;
        this.lastSynchronizationHighPriorityStartTime = System.currentTimeMillis();
        ObjectArrayUtils.forEachWhere(this.channels, channel2 -> {
            return channel2.getFilter().isHighPriority() && channel2.isPending();
        }, channel3 -> {
            channel3.copyPendingRows(this.onlySynchronizeConfiguration);
        });
        ObjectArrayUtils.forEachWhere(this.channels, channel4 -> {
            return !channel4.getFilter().isHighPriority();
        }, channel5 -> {
            copyPendingRows(channel5);
            if (System.currentTimeMillis() - this.lastSynchronizationHighPriorityStartTime < 10000) {
                return;
            }
            this.lastSynchronizationHighPriorityStartTime = System.currentTimeMillis();
            synchronizeHighPriority();
        });
        if (this.anyPreviousChannelPending) {
            return;
        }
        this.lastSuccessfulSynchronizationStartTime = currentServerTime;
    }

    private void synchronizeHighPriority() throws Exception {
        ObjectArrayUtils.forEachReverseWhere(this.channels, channel -> {
            return channel.getFilter().isHighPriority();
        }, channel2 -> {
            detectChanges(channel2, null);
        });
        ObjectArrayUtils.forEachWhere(this.channels, channel3 -> {
            return channel3.getFilter().isHighPriority() && channel3.isPending();
        }, channel4 -> {
            channel4.copyPendingRows(this.onlySynchronizeConfiguration);
        });
    }

    private void detectChanges(Channel channel, String str) throws Exception {
        FewsSqlUtils.ensureNoMaintenanceMode(this.srcDataSource);
        FewsSqlUtils.ensureSynchSwitchedOn(this.srcDataSource, this.upsertExecutor.getDstDataSource());
        validateGlobalRowIdRegeneration();
        if (channel.isPending()) {
            return;
        }
        if (!this.onlySynchronizeConfiguration || channel.getFilter().isConfig()) {
            ChangeDetector changeDetector = this.changeDetectors[channel.getFilter().getIndex()];
            changeDetector.setDecoratedTaskRunId(str);
            channel.setDecoratedTaskRunId(str);
            long networkBytes = changeDetector.getNetworkBytes();
            boolean z = this.onlySynchronizeConfiguration;
            changeDetector.getClass();
            channel.updatePendingRowIds(z, changeDetector::findModifiedRows);
            channel.getStatistics().incrementNetworkBytes(changeDetector.getNetworkBytes() - networkBytes);
            if (channel.isPending()) {
                if (changeDetector.isNewRowsFound()) {
                    this.srcDataSource.refreshTableModificationTimesNow();
                }
                this.synchronizing = true;
            }
        }
    }

    private void copyPendingRows(Channel channel) throws Exception {
        if (!((this.anyPreviousChannelPending && channel.getFilter().isWaitForPendingInPreviousChannels()) || (this.anyPreviousConfigChannelPending && channel.getFilter().isWaitForPendingInPreviousConfigChannels()))) {
            channel.copyPendingRows(this.onlySynchronizeConfiguration);
        }
        boolean isPending = channel.isPending();
        this.anyPreviousChannelPending |= isPending;
        if (channel.getFilter().isConfig()) {
            this.anyPreviousConfigChannelPending |= isPending;
        }
        this.anyPreviousChannelPending |= this.wasChannelPending[channel.getFilter().getIndex()];
        this.wasChannelPending[channel.getFilter().getIndex()] = isPending;
    }

    public boolean isDisposed() {
        return this.disposed;
    }

    public void dispose() {
        if (this.disposed) {
            return;
        }
        this.disposed = true;
        ThreadUtils.stop(this.thread);
        this.executorService.shutdown();
        try {
            ObjectArrayUtils.forEach(this.channels, channel -> {
                channel.getStatistics().save();
            });
        } catch (SQLException e) {
            log.error(e.getMessage(), e);
        }
    }

    public boolean isSynchronizing() {
        return this.synchronizing;
    }

    public boolean isInitialSynchFinished() {
        return this.initialSynchFinished;
    }

    public UpsertExecutor getUpsertExecutor() {
        return this.upsertExecutor;
    }

    public SynchProfile getProfile() {
        return this.profile;
    }

    public Channel getChannel(ChannelFilter channelFilter) {
        return this.channels[channelFilter.getIndex()];
    }

    public boolean isTaskRunQueueEmpty() {
        return this.taskRunQueue.isEmpty();
    }

    public long getLastSuccessfulSynchronizationStartTime() {
        return this.lastSuccessfulSynchronizationStartTime;
    }

    public void addChannelStartedListener(Object obj, Listener<Channel> listener) {
        this.channelStartedListeners.add(obj, listener);
    }

    public void addChannelFinishedListener(Object obj, Listener<Channel> listener) {
        this.channelFinishedListeners.add(obj, listener);
    }

    public void addChunkFinishedListener(Object obj, Listener<Channel> listener) {
        this.chunkFinishedListener.add(obj, listener);
    }

    public void addSynchFinishedListener(Object obj, Listener<Synchronizer> listener) {
        this.synchFinishedListener.add(obj, listener);
    }

    public void addTaskRunFinishedListener(Object obj, Listener<TaskRunId> listener) {
        this.taskRunFinishedListener.add(obj, listener);
    }

    public int removeListeners(Object obj) {
        return this.listenersFactory.removeListeners(obj);
    }
}
