package org.openimaj.rdf.storm.sparql.topology.bolt.sink;

import backtype.storm.spout.KestrelThriftClient;
import com.hp.hpl.jena.graph.Graph;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.query.Query;
import com.hp.hpl.jena.sparql.engine.QueryIterator;
import com.hp.hpl.jena.sparql.graph.GraphFactory;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;
import org.openimaj.kestrel.KestrelServerSpec;
import org.openimaj.rdf.storm.sparql.topology.bolt.StormSPARQLReteConflictSetBolt;
import org.openjena.riot.RiotWriter;

/* loaded from: input_file:org/openimaj/rdf/storm/sparql/topology/bolt/sink/KestrelConflictSetSink.class */
public class KestrelConflictSetSink implements StormSPARQLReteConflictSetBolt.StormSPARQLReteConflictSetBoltSink {
    private static final Logger logger = Logger.getLogger(KestrelConflictSetSink.class);
    private List<String> kestrelServers;
    private String queue;
    private int port;
    private List<KestrelThriftClient> clients;
    private int clientIndex;
    private QuerySolutionSerializer qss;
    private List<String> varOrder;

    public KestrelConflictSetSink(List<KestrelServerSpec> list, String str) {
        this.clientIndex = 0;
        this.kestrelServers = new ArrayList();
        this.port = 0;
        this.queue = str;
        this.qss = QuerySolutionSerializer.RDF_NTRIPLES;
        for (KestrelServerSpec kestrelServerSpec : list) {
            this.kestrelServers.add(kestrelServerSpec.host);
            this.port = kestrelServerSpec.port;
        }
    }

    public KestrelConflictSetSink(List<KestrelServerSpec> list, String str, QuerySolutionSerializer querySolutionSerializer) {
        this(list, str);
        this.qss = querySolutionSerializer;
    }

    @Override // org.openimaj.rdf.storm.sparql.topology.bolt.StormSPARQLReteConflictSetBolt.StormSPARQLReteConflictSetBoltSink
    public void instantiate(StormSPARQLReteConflictSetBolt stormSPARQLReteConflictSetBolt) {
        Query query = stormSPARQLReteConflictSetBolt.getQuery();
        if (query.isSelectType()) {
            this.varOrder = query.getResultVars();
        }
        this.clients = new ArrayList();
        for (String str : this.kestrelServers) {
            try {
                this.clients.add(new KestrelThriftClient(str, this.port));
            } catch (TException e) {
                logger.error("Failed to create Kestrel client for host: " + str);
                throw new RuntimeException((Throwable) e);
            }
        }
    }

    @Override // org.openimaj.rdf.storm.sparql.topology.bolt.StormSPARQLReteConflictSetBolt.StormSPARQLReteConflictSetBoltSink
    public void consumeTriple(Triple triple) {
        try {
            KestrelThriftClient nextClient = nextClient();
            Graph createGraphMem = GraphFactory.createGraphMem();
            createGraphMem.add(triple);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            RiotWriter.writeTriples(byteArrayOutputStream, createGraphMem);
            byteArrayOutputStream.flush();
            nextClient.put(this.queue, Arrays.asList(ByteBuffer.wrap(byteArrayOutputStream.toByteArray())), 10000);
        } catch (Exception e) {
            logger.error("Failed to write triple to kestrel client, " + e.getMessage(), e);
        }
    }

    private KestrelThriftClient nextClient() {
        KestrelThriftClient kestrelThriftClient = this.clients.get(this.clientIndex);
        this.clientIndex++;
        if (this.clientIndex == this.clients.size()) {
            this.clientIndex = 0;
        }
        return kestrelThriftClient;
    }

    @Override // org.openimaj.rdf.storm.sparql.topology.bolt.StormSPARQLReteConflictSetBolt.StormSPARQLReteConflictSetBoltSink
    public void consumeBindings(QueryIterator queryIterator) {
        try {
            KestrelThriftClient nextClient = nextClient();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.qss.serialize(queryIterator, this.varOrder, byteArrayOutputStream);
            nextClient.put(this.queue, Arrays.asList(ByteBuffer.wrap(byteArrayOutputStream.toByteArray())), 10000);
        } catch (Exception e) {
            logger.error("Failed to write bindings to kestrel client, " + e.getMessage(), e);
        }
    }

    @Override // org.openimaj.rdf.storm.sparql.topology.bolt.StormSPARQLReteConflictSetBolt.StormSPARQLReteConflictSetBoltSink
    public void close() {
        Iterator<KestrelThriftClient> it = this.clients.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }
}
