/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.geospatial.ip2geo.action;

import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Instant;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.StepListener;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

public class PutDatasourceTransportAction
extends HandledTransportAction<PutDatasourceRequest, AcknowledgedResponse> {
    @Generated
    private static final Logger log = LogManager.getLogger(PutDatasourceTransportAction.class);
    private final ThreadPool threadPool;
    private final DatasourceDao datasourceDao;
    private final DatasourceUpdateService datasourceUpdateService;
    private final Ip2GeoLockService lockService;

    @Inject
    public PutDatasourceTransportAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, DatasourceDao datasourceDao, DatasourceUpdateService datasourceUpdateService, Ip2GeoLockService lockService) {
        super("cluster:admin/geospatial/datasource/put", transportService, actionFilters, PutDatasourceRequest::new);
        this.threadPool = threadPool;
        this.datasourceDao = datasourceDao;
        this.datasourceUpdateService = datasourceUpdateService;
        this.lockService = lockService;
    }

    protected void doExecute(Task task, PutDatasourceRequest request, ActionListener<AcknowledgedResponse> listener) {
        this.validateManifestFile(request);
        this.lockService.acquireLock(request.getName(), 300L, (ActionListener<LockModel>)ActionListener.wrap(lock -> {
            if (lock == null) {
                listener.onFailure((Exception)((Object)new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later", new Object[0])));
                return;
            }
            try {
                this.internalDoExecute(request, (LockModel)lock, listener);
            }
            catch (Exception e) {
                this.lockService.releaseLock((LockModel)lock);
                listener.onFailure(e);
            }
        }, exception -> listener.onFailure(exception)));
    }

    @VisibleForTesting
    protected void internalDoExecute(PutDatasourceRequest request, LockModel lock, ActionListener<AcknowledgedResponse> listener) {
        StepListener createIndexStep = new StepListener();
        this.datasourceDao.createIndexIfNotExists((StepListener<Void>)createIndexStep);
        createIndexStep.whenComplete(v -> {
            Datasource datasource = Datasource.Builder.build(request);
            this.datasourceDao.putDatasource(datasource, this.getIndexResponseListener(datasource, lock, listener));
        }, exception -> {
            this.lockService.releaseLock(lock);
            listener.onFailure(exception);
        });
    }

    @VisibleForTesting
    protected ActionListener<IndexResponse> getIndexResponseListener(final Datasource datasource, final LockModel lock, final ActionListener<AcknowledgedResponse> listener) {
        return new ActionListener<IndexResponse>(){

            public void onResponse(IndexResponse indexResponse) {
                PutDatasourceTransportAction.this.threadPool.generic().submit(() -> {
                    AtomicReference<LockModel> lockReference = new AtomicReference<LockModel>(lock);
                    try {
                        PutDatasourceTransportAction.this.createDatasource(datasource, PutDatasourceTransportAction.this.lockService.getRenewLockRunnable(lockReference));
                    }
                    finally {
                        PutDatasourceTransportAction.this.lockService.releaseLock(lockReference.get());
                    }
                });
                listener.onResponse((Object)new AcknowledgedResponse(true));
            }

            public void onFailure(Exception e) {
                PutDatasourceTransportAction.this.lockService.releaseLock(lock);
                if (e instanceof VersionConflictEngineException) {
                    listener.onFailure((Exception)new ResourceAlreadyExistsException("datasource [{}] already exists", new Object[]{datasource.getName()}));
                } else {
                    listener.onFailure(e);
                }
            }
        };
    }

    @VisibleForTesting
    protected void createDatasource(Datasource datasource, Runnable renewLock) {
        if (!DatasourceState.CREATING.equals((Object)datasource.getState())) {
            log.error("Invalid datasource state. Expecting {} but received {}", (Object)DatasourceState.CREATING, (Object)datasource.getState());
            this.markDatasourceAsCreateFailed(datasource);
            return;
        }
        try {
            this.datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
        }
        catch (Exception e) {
            log.error("Failed to create datasource for {}", (Object)datasource.getName(), (Object)e);
            this.markDatasourceAsCreateFailed(datasource);
        }
    }

    private void markDatasourceAsCreateFailed(Datasource datasource) {
        datasource.getUpdateStats().setLastFailedAt(Instant.now());
        datasource.setState(DatasourceState.CREATE_FAILED);
        try {
            this.datasourceDao.updateDatasource(datasource);
        }
        catch (Exception e) {
            log.error("Failed to mark datasource state as CREATE_FAILED for {}", (Object)datasource.getName(), (Object)e);
        }
    }

    private void validateManifestFile(PutDatasourceRequest request) {
        DatasourceManifest manifest;
        try {
            URL url = new URL(request.getEndpoint());
            manifest = DatasourceManifest.Builder.build(url);
        }
        catch (Exception e) {
            log.info("Error occurred while reading a file from {}", (Object)request.getEndpoint(), (Object)e);
            throw new IllegalArgumentException(String.format(Locale.ROOT, "Error occurred while reading a file from %s: %s", request.getEndpoint(), e.getMessage()));
        }
        try {
            new URL(manifest.getUrl()).toURI();
        }
        catch (MalformedURLException | URISyntaxException e) {
            log.info("Invalid URL[{}] is provided for url field in the manifest file", (Object)manifest.getUrl(), (Object)e);
            throw new IllegalArgumentException("Invalid URL format is provided for url field in the manifest file");
        }
        if (manifest.getValidForInDays() != null && request.getUpdateInterval().days() >= manifest.getValidForInDays()) {
            throw new IllegalArgumentException(String.format(Locale.ROOT, "updateInterval %d should be smaller than %d", request.getUpdateInterval().days(), manifest.getValidForInDays()));
        }
    }
}

