Author: markus Date: Fri Jan 15 10:45:27 2016 New Revision: 1724771 URL: http://svn.apache.org/viewvc?rev=1724771&view=rev Log: NUTCH-2194 Run IndexingFilterChecker as simple Telnet server
Modified: nutch/trunk/CHANGES.txt nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java Modified: nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1724771&r1=1724770&r2=1724771&view=diff ============================================================================== --- nutch/trunk/CHANGES.txt (original) +++ nutch/trunk/CHANGES.txt Fri Jan 15 10:45:27 2016 @@ -1,5 +1,7 @@ Nutch Change Log +* NUTCH-2194 Run IndexingFilterChecker as simple Telnet server (markus) + * NUTCH-2196 IndexingFilterChecker to optionally normalize (markus) * NUTCH-2195 IndexingFilterChecker to optionally follow N redirects (markus) Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java?rev=1724771&r1=1724770&r2=1724771&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java (original) +++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java Fri Jan 15 10:45:27 2016 @@ -17,6 +17,13 @@ package org.apache.nutch.indexer; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -59,6 +66,13 @@ import org.slf4j.LoggerFactory; public class IndexingFiltersChecker extends Configured implements Tool { + protected URLNormalizers normalizers = null; + protected boolean dumpText = false; + protected boolean followRedirects = false; + // used to simulate the metadata propagated from injection + protected HashMap<String, String> metadata = new HashMap<String, String>(); + protected int tcpPort = -1; + public static final Logger LOG = LoggerFactory .getLogger(IndexingFiltersChecker.class); @@ -67,25 +81,19 @@ public class IndexingFiltersChecker exte } public int run(String[] args) throws Exception { - String contentType = null; String url = null; - URLNormalizers normalizers = null; - boolean dumpText = false; - boolean followRedirects = false; - - String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] <url>"; + String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] [-listen <port>] <url>"; if (args.length == 0) { System.err.println(usage); return -1; } - // used to simulate the metadata propagated from injection - HashMap<String, String> metadata = new HashMap<String, String>(); - for (int i = 0; i < args.length; i++) { if (args[i].equals("-normalize")) { normalizers = new URLNormalizers(getConf(), URLNormalizers.SCOPE_DEFAULT); + } else if (args[i].equals("-listen")) { + tcpPort = Integer.parseInt(args[++i]); } else if (args[i].equals("-followRedirects")) { followRedirects = true; } else if (args[i].equals("-dumpText")) { @@ -108,6 +116,88 @@ public class IndexingFiltersChecker exte } } + // In listening mode? + if (tcpPort == -1) { + // No, just fetch and display + StringBuilder output = new StringBuilder(); + int ret = fetch(url, output); + System.out.println(output); + return ret; + } else { + // Listen on socket and start workers on incoming requests + listen(); + } + + return 0; + } + + protected void listen() throws Exception { + ServerSocket server = null; + + try{ + server = new ServerSocket(); + server.bind(new InetSocketAddress(tcpPort)); + LOG.info(server.toString()); + } catch (Exception e) { + LOG.error("Could not listen on port " + tcpPort); + System.exit(-1); + } + + while(true){ + Worker worker; + try{ + worker = new Worker(server.accept()); + Thread thread = new Thread(worker); + thread.start(); + } catch (Exception e) { + LOG.error("Accept failed: " + tcpPort); + System.exit(-1); + } + } + } + + private class Worker implements Runnable { + private Socket client; + + Worker(Socket client) { + this.client = client; + LOG.info(client.toString()); + } + + public void run(){ + String line; + BufferedReader in = null; + PrintWriter out = null; + + try{ + in = new BufferedReader(new InputStreamReader(client.getInputStream())); + } catch (Exception e) { + LOG.error("in or out failed"); + System.exit(-1); + } + + try{ + line = in.readLine(); + StringBuilder output = new StringBuilder(); + fetch(line, output); + + client.getOutputStream().write(output.toString().getBytes(Charset.forName("UTF-8"))); + }catch (Exception e) { + LOG.error("Read/Write failed: " + e); + } + + try { + client.close(); + } catch (Exception e){ + LOG.error(e.toString()); + } + + return; + } + } + + + protected int fetch(String url, StringBuilder output) throws Exception { if (normalizers != null) { url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); } @@ -129,12 +219,12 @@ public class IndexingFiltersChecker exte int maxRedirects = 3; - ProtocolOutput output = getProtocolOutput(url, datum); + ProtocolOutput protocolOutput = getProtocolOutput(url, datum); Text turl = new Text(url); // Following redirects and not reached maxRedirects? - while (!output.getStatus().isSuccess() && followRedirects && output.getStatus().isRedirect() && maxRedirects != 0) { - String[] stuff = output.getStatus().getArgs(); + while (!protocolOutput.getStatus().isSuccess() && followRedirects && protocolOutput.getStatus().isRedirect() && maxRedirects != 0) { + String[] stuff = protocolOutput.getStatus().getArgs(); url = stuff[0]; if (normalizers != null) { @@ -144,24 +234,24 @@ public class IndexingFiltersChecker exte turl.set(url); // try again - output = getProtocolOutput(url, datum); + protocolOutput = getProtocolOutput(url, datum); maxRedirects--; } - if (!output.getStatus().isSuccess()) { - System.out.println("Fetch failed with protocol status: " - + output.getStatus()); + if (!protocolOutput.getStatus().isSuccess()) { + output.append("Fetch failed with protocol status: " + + protocolOutput.getStatus() + "\n"); return 0; } - Content content = output.getContent(); + Content content = protocolOutput.getContent(); if (content == null) { - System.out.println("No content for " + url); + output.append("No content for " + url + "\n"); return 0; } - contentType = content.getContentType(); + String contentType = content.getContentType(); if (contentType == null) { return -1; @@ -211,6 +301,7 @@ public class IndexingFiltersChecker exte .set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature)); String digest = parse.getData().getContentMeta().get(Nutch.SIGNATURE_KEY); doc.add("digest", digest); + datum.setSignature(signature); // call the scoring filters try { @@ -226,7 +317,7 @@ public class IndexingFiltersChecker exte } if (doc == null) { - System.out.println("Document discarded by indexing filter"); + output.append("Document discarded by indexing filter\n"); return 0; } @@ -236,7 +327,7 @@ public class IndexingFiltersChecker exte for (Object value : values) { String str = value.toString(); int minText = dumpText ? str.length() : Math.min(100, str.length()); - System.out.println(fname + " :\t" + str.substring(0, minText)); + output.append(fname + " :\t" + str.substring(0, minText) + "\n"); } } } @@ -255,8 +346,8 @@ public class IndexingFiltersChecker exte ProtocolFactory factory = new ProtocolFactory(getConf()); Protocol protocol = factory.getProtocol(url); Text turl = new Text(url); - ProtocolOutput output = protocol.getProtocolOutput(turl, datum); - return output; + ProtocolOutput protocolOutput = protocol.getProtocolOutput(turl, datum); + return protocolOutput; } public static void main(String[] args) throws Exception {