Hi,
I've developed a test custom serializer following examples and the way other
decoders I've seen. The code itself does little, the real problem is an
exception while trying to load:
2015-02-18 12:04:39,452 (lifecycleSupervisor-1-1) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
Component type: SINK, name: test07_sink started
2015-02-18 12:04:39,464 (lifecycleSupervisor-1-1) [INFO -
org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:136)]
RollingFileSink test07_sink started.
2015-02-18 12:04:39,481 (lifecycleSupervisor-1-0) [DEBUG -
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)]
Initializing ReliableSpoolingFileEventReader with
directory=/opt/bigdata/flume/files-in, metaDir=.flumespool, deserializer=LINE
2015-02-18 12:04:39,482 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling
sink runner starting
2015-02-18 12:04:39,483 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG
- org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:167)]
Opening output stream for file /opt/bigdata/flume/file-sink/1424257479324-1
2015-02-18 12:04:39,486 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG
-
org.apache.flume.serialization.EventSerializerFactory.getInstance(EventSerializerFactory.java:48)]
Not in enum, loading builder class: com.produban.flume.TestSerializer
2015-02-18 12:04:39,488 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to
deliver event. Exception follows.
org.apache.flume.FlumeException: Unable to instantiate Builder from
com.produban.flume.TestSerializer: does not appear to implement
org.apache.flume.serialization.EventSerializer$Builder
at
org.apache.flume.serialization.EventSerializerFactory.getInstance(EventSerializerFactory.java:64)
at
org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:171)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
2015-02-18 12:04:39,513 (lifecycleSupervisor-1-0) [DEBUG -
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)]
Successfully created and deleted canary file:
/opt/bigdata/flume/files-in/flume-spooldir-perm-check-4651476069921147664.canary
Configuration for the agent points to:
# OpenBank | BigData | test07: file as sink
# Primer destino en disco
test07.sinks.test07_sink.type = file_roll
test07.sinks.test07_sink.channel = test07_channel
test07.sinks.test07_sink.sink.directory = /opt/bigdata/flume/file-sink
test07.sinks.test07_sink.sink.serializer = com.produban.flume.TestSerializer
And class code is simple, it compiles properly:
package com.produban.flume;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.Configurable;
import org.apache.flume.serialization.*;
public class TestSerializer implements EventSerializer, Configurable {
private boolean appendNewline = false;
private OutputStream out;
public TestSerializer(OutputStream out, Context context, Object
object) {
// TODO Auto-generated constructor stub
this.appendNewline =
context.getBoolean("appendNewline", Boolean.valueOf(true)).booleanValue();
this.out = out;
}
public EventSerializer build(Context arg0, OutputStream arg1) {
// TODO Auto-generated method stub
return null;
}
@Override
public void afterCreate() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void afterReopen() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void beforeClose() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void flush() throws IOException {
// TODO Auto-generated method stub
}
@Override
public boolean supportsReopen() {
// TODO Auto-generated method stub
return false;
}
@Override
public void write(Event arg0) throws IOException {
// TODO Auto-generated method stub
String newBody;
System.console().writer().println("TestSerializer content:");
System.console().writer().println(arg0.getBody().toString());
System.console().writer().println("TestSerializer to
write:");
newBody=arg0.getBody().toString()+","+arg0.getBody().toString().length();
arg0.setBody(newBody.getBytes());
System.console().writer().println(arg0.getBody().toString());
this.out.write((arg0.getHeaders() + "
").getBytes());
this.out.write(arg0.getBody());
if (this.appendNewline) {
this.out.write(10);
}
}
@Override
public void configure(Context arg0) {
// TODO Auto-generated method stub
}
public static class Builder
implements EventSerializer.Builder
{
public EventSerializer build(Context context, OutputStream
out)
{
TestSerializer s = new TestSerializer(out,
context, null);
return s;
}
}
}
FlumeTestSink.jar is located under:
/opt/bigdata/flume/apache-flume-1.5.2-bin/lib and loads.
Any guess on the problem?
Thanks in advance,
Juan
_____________________________________________________________________
Juan Francisco Tavira Manjón
Grupo Santander - Produban
Dirección Global de Técnica de Sistemas
Sistemas Distribuidos: BPM / Tibco
Parque Empresarial La Finca - Edificio 16 planta 1
Paseo del Club Deportivo s/n - 28223 Pozuelo de Alarcón (Madrid)
Teléfono: +34 91 289 88 43 - Móvil: +34 615 90 92 01
Email: [email protected]<mailto:[email protected]>
"It's the ship that made the Kessel Run in less than twelve parsecs"
________________________________
Antes de imprimir este mensaje o sus documentos anexos, asegúrese de que es
necesario.
Proteger el medio ambiente está en nuestras manos.
Before printing this e-mail or attachments, be sure it is necessary.
It is in our hands to protect the environment.
******************AVISO LEGAL**********************
Este mensaje es privado y confidencial y solamente para la persona a la que va
dirigido. Si usted ha recibido este mensaje por error, no debe revelar, copiar,
distribuir o usarlo en ningún sentido. Le rogamos lo comunique al remitente y
borre dicho mensaje y cualquier documento adjunto que pudiera contener. No hay
renuncia a la confidencialidad ni a ningún privilegio por causa de transmisión
errónea o mal funcionamiento.
Cualquier opinión expresada en este mensaje pertenece únicamente al autor
remitente, y no representa necesariamente la opinión de Grupo Santander, a no
ser que expresamente se diga y el remitente esté autorizado para hacerlo. Los
correos electrónicos no son seguros, no garantizan la confidencialidad ni la
correcta recepción de los mismos, dado que pueden ser interceptados,
manipulados, destruidos, llegar con demora, incompletos, o con virus. Grupo
Santander no se hace responsable de las alteraciones que pudieran hacerse al
mensaje una vez enviado.
Este mensaje sólo tiene una finalidad de información, y no debe interpretarse
como una oferta de venta o de compra de valores ni de instrumentos financieros
relacionados. En el caso de que el destinatario de este mensaje no consintiera
la utilización del correo electrónico vía Internet, rogamos lo ponga en nuestro
conocimiento.
**********************DISCLAIMER*****************
This message is private and confidential and it is intended exclusively for the
addressee. If you receive this message by mistake, you should not disseminate,
distribute or copy this e-mail. Please inform the sender and delete the message
and attachments from your system. No confidentiality nor any privilege
regarding the information is waived or lost by any mistransmission or
malfunction.
Any views or opinions contained in this message are solely those of the author,
and do not necessarily represent those of Grupo Santander, unless otherwise
specifically stated and the sender is authorized to do so. E-mail transmission
cannot be guaranteed to be secure, confidential, or error-free, as information
could be intercepted, corrupted, lost, destroyed, arrive late, incomplete, or
contain viruses. Grupo Santander does not accept responsibility for any changes
in the contents of this message after it has been sent.
This message is provided for informational purposes and should not be construed
as a solicitation or offer to buy or sell any securities or related financial
instruments. If the addressee of this message does not consent to the use of
internet e-mail, please communicate it to us.