Repository: nutch Updated Branches: refs/heads/master e53b34b23 -> 836b2e01d
NUTCH-2320 URLFilterChecker to run as TCP Telnet service Project: http://git-wip-us.apache.org/repos/asf/nutch/repo Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/836b2e01 Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/836b2e01 Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/836b2e01 Branch: refs/heads/master Commit: 836b2e01d1a4e0e9443601da755ea37de91b8c7d Parents: e53b34b Author: Markus Jelsma <mar...@apache.org> Authored: Wed Oct 5 14:53:05 2016 +0200 Committer: Markus Jelsma <mar...@apache.org> Committed: Wed Oct 5 14:53:05 2016 +0200 ---------------------------------------------------------------------- .../org/apache/nutch/net/URLFilterChecker.java | 181 +++++++++++++------ 1 file changed, 122 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nutch/blob/836b2e01/src/java/org/apache/nutch/net/URLFilterChecker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/net/URLFilterChecker.java b/src/java/org/apache/nutch/net/URLFilterChecker.java index 89a3d00..86b91e2 100644 --- a/src/java/org/apache/nutch/net/URLFilterChecker.java +++ b/src/java/org/apache/nutch/net/URLFilterChecker.java @@ -17,16 +17,27 @@ package org.apache.nutch.net; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.apache.nutch.plugin.Extension; import org.apache.nutch.plugin.ExtensionPoint; import org.apache.nutch.plugin.PluginRepository; +import org.apache.nutch.util.NutchConfiguration; import org.apache.hadoop.conf.Configuration; -import org.apache.nutch.util.NutchConfiguration; - -import java.io.BufferedReader; -import java.io.InputStreamReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Checks one given filter or all filters. @@ -36,62 +47,118 @@ import java.io.InputStreamReader; public class URLFilterChecker { private Configuration conf; - + private static String filterName = null; + protected static boolean keepClientCnxOpen = false; + protected static int tcpPort = -1; + protected URLFilters filters = null; + + public static final Logger LOG = LoggerFactory + .getLogger(URLFilterChecker.class); + public URLFilterChecker(Configuration conf) { + System.out.println("Checking combination of all URLFilters available"); this.conf = conf; + if (filterName != null) { + this.conf.set("plugin.includes", filterName); + } + filters = new URLFilters(this.conf); } + + public void run() throws Exception { + // In listening mode? + if (tcpPort == -1) { + // No, just fetch and display + checkStdin(); + } else { + // Listen on socket and start workers on incoming requests + listen(); + } + } + + private void listen() throws Exception { + ServerSocket server = null; + + try{ + server = new ServerSocket(); + server.bind(new InetSocketAddress(tcpPort)); + LOG.info(server.toString()); + } catch (Exception e) { + LOG.error("Could not listen on port " + tcpPort); + System.exit(-1); + } + + while(true){ + Worker worker; + try{ + worker = new Worker(server.accept()); + Thread thread = new Thread(worker); + thread.start(); + } catch (Exception e) { + LOG.error("Accept failed: " + tcpPort); + System.exit(-1); + } + } + } + + private class Worker implements Runnable { + private Socket client; - private void checkOne(String filterName) throws Exception { - URLFilter filter = null; - - ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint( - URLFilter.X_POINT_ID); - - if (point == null) - throw new RuntimeException(URLFilter.X_POINT_ID + " not found."); - - Extension[] extensions = point.getExtensions(); + Worker(Socket client) { + this.client = client; + LOG.info(client.toString()); + } - for (int i = 0; i < extensions.length; i++) { - Extension extension = extensions[i]; - filter = (URLFilter) extension.getExtensionInstance(); - if (filter.getClass().getName().equals(filterName)) { - break; + public void run() { + if (keepClientCnxOpen) { + while (true) { // keep connection open until closes + readWrite(); + } } else { - filter = null; + readWrite(); + + try { // close ourselves + client.close(); + } catch (Exception e){ + LOG.error(e.toString()); + } } } + + protected void readWrite() { + String line; + BufferedReader in = null; + PrintWriter out = null; + + try{ + in = new BufferedReader(new InputStreamReader(client.getInputStream())); + } catch (Exception e) { + LOG.error("in or out failed"); + System.exit(-1); + } - if (filter == null) - throw new RuntimeException("Filter " + filterName + " not found."); - - // jerome : should we keep this behavior? - // if (LogFormatter.hasLoggedSevere()) - // throw new RuntimeException("Severe error encountered."); - - System.out.println("Checking URLFilter " + filterName); - - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); - String line; - while ((line = in.readLine()) != null) { - String out = filter.filter(line); - if (out != null) { - System.out.print("+"); - System.out.println(out); - } else { - System.out.print("-"); - System.out.println(line); + try{ + line = in.readLine(); + + String result = filters.filter(line); + String output; + if (result != null) { + output = "+" + result + "\n"; + } else { + output = "-" + line + "\n";; + } + + client.getOutputStream().write(output.getBytes(Charset.forName("UTF-8"))); + }catch (Exception e) { + LOG.error("Read/Write failed: " + e); } } } - private void checkAll() throws Exception { - System.out.println("Checking combination of all URLFilters available"); - + private void checkStdin() throws Exception { BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); String line; + while ((line = in.readLine()) != null) { - URLFilters filters = new URLFilters(this.conf); String out = filters.filter(line); if (out != null) { System.out.print("+"); @@ -105,30 +172,26 @@ public class URLFilterChecker { public static void main(String[] args) throws Exception { - String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) \n" + String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) [-listen <port>] [-keepClientCnxOpen]) \n" + "Tool takes a list of URLs, one per line, passed via STDIN.\n"; - if (args.length == 0) { + if (args.length < 1) { System.err.println(usage); System.exit(-1); } - String filterName = null; - if (args[0].equals("-filterName")) { - if (args.length != 2) { - System.err.println(usage); - System.exit(-1); + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-filterName")) { + filterName = args[++i]; + } else if (args[i].equals("-listen")) { + tcpPort = Integer.parseInt(args[++i]); + } else if (args[i].equals("-keepClientCnxOpen")) { + keepClientCnxOpen = true; } - filterName = args[1]; } - + URLFilterChecker checker = new URLFilterChecker(NutchConfiguration.create()); - if (filterName != null) { - checker.checkOne(filterName); - } else { - checker.checkAll(); - } - + checker.run(); System.exit(0); } }