ZooKeeper 긁적이기

ZooKeeper 긁적이기

ZooKeeper는 MSA에서 Scalable한 구조를 유지가히 위해 많은 오픈소스 솔루션이 내부적으로 활용하는 Coordination 서비스라 할 수 있다. 대표적으로 Kafka에서 내부적으로 환경을 관리하고 클러스터를 유지하기 위해 사용하는 정도로 알고 있다.

ZooKeeper의 탄생을 보면 야후(Yahoo)가 아직은 그 명성이 살아 있을 때 클라우드 환경으로 전환과 맞물려 자체으로 필요에 의해 만든 후 오픈소스로 공개한 서비스이다(2011). 클라우드/MSA환경에서는 안정적인 서비스 운영을 위해 Scalable한 구조가 필수적이다. 이때 서비스의 트래픽 상황에 따라 노드 개수가 늘어나거나 줄어들어야 하며 바이너리 업데이트 배포시에도 모든 노드들에 동일한 환경이 구성되도록 해야 한다. 예를 들면 환경설정 파일에 변경이 발생할 경우 동일한 환경설정을 모든 노드들에 배포하는 것은 Time Consuming한 작업이 될 것이며 이 과정에서 의도치 않은 Human Error가 발생할 수도 있다. ZooKeeper로 할 수 있는 작업이 많이 있겠지만 적어도 이런 작업에 ZooKeeper를 이용하여 자동화를 한다면 업무 생산성이 높아질 것이다.

이 글은 이런 시나리오 상황에서 ZooKeeper로 구현할 수 있는 아주 기본적인 코드를 만들어 보고자 한다. ZooKeeper가 지원하는 기능이 어떤 것이 있으며 어떤 방식으로 동작하는지 기본적으로 알고 있으면 많은 도움이 된다. 이 경우 ZooKeeper 개발자 가이드를 먼저 참고하기 바란다.

ZooKeeper 홈페이지에 ZooKeeper Java Example이라는 것이 있긴 하지만 ZooKeeper 개념이 생소한 개발자가 보기에는 좀 난해할 것이라 생각된다. 왜냐하면 Watcher라는 개념과 StatCallback이 어떻게 동작하는지 어느정도 이론적인 이해가 있어야 하고 제공되는 예제가 개념적으로 이해하기 쉽도록 개발된 코드가 아니기 때문이다.

좀 더 명료한 이해를 위해 ZooKeeper에 등록되어 있는 환경설정 정보가 변경되었을 경우 클라이언트가 이 이벤트를 받았을 때 로컬에 있는 환경설정을 업데이트 하는 시나리오로 ZooKeeper 클라이언트 코드를 만들어 보도록 한다.

ZooKeeper에서 배포되는 클라이언트 API를 활용하려면 기본적으로 3개의 파라미터 설정이 필요하다.

  • ZooKeeper 서버정보: 연결할 ZooKeeper서버 정보를 나타내며 등록할 서버가 여러개일 경우 콤마(,)로 구분하여 기술한다. 예) 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
  • 세션 타임아웃: 세션 타임아웃 시간(milliseconds)
  • Watcher: ZooKeeper 서버의 상태가 변경될 경우 이벤트를 받기 위해 등록하는 인터페이스

여기서 중요한 것은 Watcher이다. Watcher의 자세한 설명은 여기서 확인하기 바란다. 알아 두어야 할 것은 Watcher 이벤트는 One-time Trigger이므로 한번 발생된 이벤트는 다시 발생되지 않으므로 다시 등록하는 과정을 거쳐야 한다는 것이다.

그리고 의미 전달을 위해 예제는 ZooKeeper API와 이벤트를 처리하는 로직 사이에 최대한 Coupling을 없애는 형태로 만들도록 한다. 이를 위해 Proxy 인터페이스와 LazyListener 인터페이스를 둔다.

/*
 * Copyright (c) 2024 kt corp. All rights reserved.
 */
package app.zookeeper;

import org.apache.zookeeper.ZooKeeper;

/**
 * Interface for getting ZooKeeper instance in a loosely coupled manner.
 */
public interface Proxy {
    
    /**
     * 
     * @return ZooKeeper instance.
     */
    ZooKeeper zookeeper();
}

Proxy 인터페이스는 ZooKeeper 클라이언트 로직 처리과정에서 ZooKeeper API와 Coupling을 줄이면서 인스턴스를 생성하기 위한 용도로 사용한다.

/*
 * Copyright (c) 2024 kt corp. All rights reserved.
 */
package app.zookeeper;

import java.util.List;

/**
 * This interface is used for loading EventListeners in a loosely coupled manner.
 */
public interface LazyListener {
    
    /**
     * @return EventListeners to register in the ZooKeeper client. Through these EventListeners you can be notified
     * with the specific ZooKeeper event.
     */
    List<EventListener> listeners();
}

LazyListener 인페이스는 Watcher에서 이벤트 처리과정에서 필요로 하는 EventListener를 Loosely Coupled된 방식으로 등록하기 위해 사용된다. 나중에 설명하겠지만 EventListener는 ZooKeeper서버에서 변경이 발생할 경우 관련 이벤트를 Callback받아 처리하기 위한 용도로 사용된다.

ZooKeeper에서 변경이 발생하면 클라이언트는 Watcher라는 인퍼페이스를 통해서 해당 이벤트를 받아 처리할 수 있다. 이벤트가 발생하면 이 Watcher에서 처리야야 하는 일련의 과정들이 있지만 공통적으로 처리하는 과정들은 Framework 형태로 분리시키고 실제 업무로직으로 처리가 필요한 작업들은 EventListener에서 처리하도록 설계해 본다.

공통적으로 처리할 과정들을 Framework화 한 것이 문지기의 역할과 비슷할것 같아 Guardian이라는 클래스 이름으로 구현했다. 앞에서 설명한 두 개의 인터페이스(Proxy, LazyListeners)는 Guardian에서 ZooKeeper API와 EventListener 사이에서 Loosely하게 상호작용하도록 구성하였으며 그림으로 표현하면 아래와 같다.

[Event Listener] <--> [Guardian] <--> [ZooKeeper API] <--> [ZooKeeper Server]

Guadian은 앞서 설명한 것처럼 ZooKeeper 의 이벤트를 받을 수 있도록 Wacher인터페이스를 구현하며 발생된 이벤트 유형에 따라 등록된 EventListener가 적절하게 처리할 수 있도록 EventListener 등록작업과 Event Dispatch작업을 주로 수행한다. 아래는 EventListener 인터페이스이며 on 메소드에서 이벤트를 Callback받으면 상황에 맞게 적적할 작업을 처리하면 된다.

/**
 * Example for testing the operation of ZooKeeper
 * @author skanto.kim@kt.com
 */
package app.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;

/**
 * You can implement this interface if you want to get notified when the node on the ZooKeeper is changed. 
 */
public interface EventListener {
    
    /**
     * This method will be notified if the {@link WatchedEvent} is fired on the node. You can also receive the data
     * as a second parameter that belongs to the path
     * 
     * @param event
     * @param data
     * @param path
     * @param ctx
     * @param stat
     */
    void on(WatchedEvent event, byte[] data, String path, Object ctx, Stat stat) throws Exception;
}

아래는 Guardian.java 코드이며 addListener 메소드와 curry 메소드를 눈여겨 볼만 하다. 앞에서설명한 것처럼 ZooKeeper Event는 One-time 이벤트 이므로 Event처리 후 계속해서 이벤트를 받으려면 exists 메소드를 이용하여 Watch를 설정해야 한다. 이 과정은 monitor 메소드를 참고하기 바란다.

package app.zookeeper;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;

/**
 * 
 */
public class Guardian implements Watcher {

    private final Map<EventType, Map<String, EventListener>> registry = new HashMap<EventType, Map<String, EventListener>>();
    private final Proxy proxy;
    private final LazyListener lazy;
    
    /**
     * @param proxy
     * @param lazy
     */
    public Guardian(Proxy proxy, LazyListener lazy) {
        this.proxy = proxy;
        this.lazy = lazy;
    }
    
    @SuppressWarnings("incomplete-switch")
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.None) {
            switch (event.getState()) {
            case SyncConnected:
                if (event.getPath() != null) break;
                lazy.listeners().forEach((listener) -> {
                    addListener(listener);
                    monitor(annotationOf(listener).znode(), curry(listener, event));
                });
                break;
            case Expired:
                System.out.println("session expired!");
                notifyAll();
                break;
            }
        } else {
            monitor(event.getPath(), curry(listenersOf(event.getType()).get(event.getPath()), event));
        }
    }
    
    /**
     * @param path
     * @param callback
     */
    private void monitor(String path, StatCallback callback) {
        proxy.zookeeper().exists(path, true, callback, null);
    }
    
    /**
     * @param type
     * @param listener
     */
    @SuppressWarnings("serial")
    private void addListener(EventListener listener) {
        ZooKeeperWatch annotation = annotationOf(listener);
        Map<String, EventListener> listeners = listenersOf(annotation.event());
        if (listeners == null) {
            registry.put(annotation.event(), new HashMap<String, EventListener>() {{
                put(annotation.znode(), listener); 
            }});
        } else {
            listeners.put(annotation.znode(), listener); // replace the existing one
        }
    }
    
    /**
     * @param listener
     * @return
     */
    public ZooKeeperWatch annotationOf(EventListener listener) {
        try {
            Method m = listener.getClass().getMethod("on", EventListener.class.getMethods()[0].getParameterTypes());
            if (!m.isAnnotationPresent(ZooKeeperWatch.class)) {
                throw new RuntimeException(listener.getClass().getSimpleName() + "." + m.getName() 
                + " is not annotated with @" + ZooKeeperWatch.class.getSimpleName());
            }
            return m.getAnnotation(ZooKeeperWatch.class);
        } catch (Exception e) {
            throw new IllegalArgumentException("Can not find the annotation from the EventListener", e);
        }
    }

    /**
     * @param type
     * @return
     */
    public Map<String, EventListener> listenersOf(EventType type) {
        return registry.get(type);
    }
    
    /**
     * @param listener
     * @return
     */
    private StatCallback curry(final EventListener listener, final WatchedEvent event) {
        return new StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                switch(Code.get(rc)) {
                case OK:
                    break;
                case NONODE:
                    break;
                case SESSIONEXPIRED:
                case NOAUTH:
                    System.out.println("Session is expired or no auth");
                    return;
                default:
                    System.out.println("default");
                    proxy.zookeeper().exists(path, true, this, null);
                    return;
                }
                if (Code.get(rc) == Code.OK && annotationOf(listener).event() == event.getType()) {
                    try {
                        listener.on(event, proxy.zookeeper().getData(path, false, null), path, ctx, stat);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }                
            }
        };
    }
}

향후 ZooKeeper를 이용한 서비스 개발 시 이 코드를 참조할 경우가 있을 것으로 보여 EventListener등록시 Listening할 Event Type과 지켜 볼 ZooKeeper노드를 편리하게 설정할 수 있도록 java Annotation을 활용했다. Annotation을 만드는 것인 이 글의 범위를 벗어나는 것이니 관련 자료를 참고하기 바란다.

그리고 monitor 메소드에서 exists 파라미터로 StatCallback을 등록하며 이때 StatCallback 객체 생성시 JavaScript에서 많이 활용하는 currying 방식을 활용했으니 이 부분도 같이 참고하기 바란다.

ZooKeeper Node에 변경이 발생할 경우 EventListener에서 이벤트를 받아 관련작업을 처리하는데 여기서는 설정정보가 변경된 것을 가정하여 변경된 설정정보를 환경파일로 저장하는 과정을 예제로 만들었다.

/**
 * Example for testing the operation of ZooKeeper
 * @author skanto.kim@kt.com
 */
package app.zookeeper;

import java.io.FileOutputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;

public class AporiaChangeListener implements EventListener {
    
    /**
     * It holds previous configuration data used for verifying the changes compared to this change
     */
    private byte prev[];

    @ZooKeeperWatch(event = EventType.NodeDataChanged, znode = "/configs/aporia")
    public void on(WatchedEvent event, byte[] data, String path, Object ctx, Stat stat) throws Exception {
        final String filename = "/home/skanto/Documents/00.Development/zookeeper/aporia.conf";
        if ((data == null && data != prev) || (data != null && !Arrays.equals(prev, data))) {
            try (OutputStream fos = new FileOutputStream(filename)) {
                fos.write(data);
                prev = data;
            }
            System.out.printf("%s - %s: %s\n", event.getType(), path, new String(data, StandardCharsets.UTF_8));
        } else {
            System.out.printf("%s - %s: %s\n", event.getType(), path, "Data is not changed!");
        }
    }
}

이와 유사하게 환경설정 정보가 여러 개일 경우를 가정하여 하나 더 만들어 본다.

/**
 * Example for testing the operation of ZooKeeper
 * @author skanto.kim@kt.com
 */
package app.zookeeper;

import java.io.FileOutputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;

/**
 * 
 */
public class TelosChangeListener implements EventListener {
    
    /**
     * It holds previous configuration data used for verifying the changes compared to this change
     */
    private byte prev[];

    @ZooKeeperWatch(event = EventType.NodeDataChanged, znode = "/configs/telos")
    public void on(WatchedEvent event, byte[] data, String path, Object ctx, Stat stat) throws Exception {
        final String filename = "/home/skanto/Documents/00.Development/zookeeper/telos.conf";
        if ((data == null && data != prev) || (data != null && !Arrays.equals(prev, data))) {
            try (OutputStream fos = new FileOutputStream(filename)) {
                fos.write(data);
                prev = data;
            }
            System.out.printf("%s - %s: %s\n", event.getType(), path, new String(data, StandardCharsets.UTF_8));
        } else {
            System.out.printf("%s - %s: %s\n", event.getType(), path, "Data is not changed!");
        }
    }
}

위 코드에서 눈여겨 볼 만한 내용은 Aonnotation을 활용하여 EventType과 ZooKeeper 노드를 설정했다는 것이다. 또한 Change 이벤트가 발생하더라도 이전의 데이터와 비교하여 변경된 부분이 있는지 확인하고 변경된 부분이 있다면 파일로 저장하도록 예제를 만들었다.

자, 이제 이렇게 만든 Guardian과 2개의 Listener를 조합하여 ZooKeeperClient 예제를 만들도록 한다.

/**
 * Example for testing the operation of ZooKeeper
 * @author skanto.kim@kt.com
 */
package app.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

/**
 * ZooKeeper Example
 */
public class ZooKeeperClient {
    
    /**
     * 
     */
    private ZooKeeper zookeeper;
    
    /**
     * @param server
     * @param listeners
     * @throws IOException
     */
    public void observe(String server, List<EventListener> listeners) throws IOException {
        Watcher watcher = new Guardian(() -> zookeeper, () -> listeners);
        this.zookeeper = new ZooKeeper(server, 3000, watcher);
    }
    
    /**
     * @param definitions
     * @return
     * @throws Exception
     */
    @SuppressWarnings("serial")
    private static List<EventListener> loadListeners(String[] listeners) throws Exception {
        return new ArrayList<EventListener>() {{
            for (String str: listeners) add(obj(str, EventListener.class));
        }};
    }
    
    @SuppressWarnings({ "unchecked" })
    private static <T> T obj(final String str, Class<T> iface) throws Exception {
        Class<T> clazz = (Class<T>) Class.forName(str).asSubclass(iface);
        return  clazz.getConstructor(new Class[]{ }).newInstance();
    }
    
    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws Exception {
        final String[] listeners = { "app.zookeeper.AporiaChangeListener", "app.zookeeper.TelosChangeListener" };
        ZooKeeperClient client = new ZooKeeperClient();
        client.observe("localhost:2181", loadListeners(listeners));
        synchronized (client) {
            System.out.println("Now is waiting for this thread to be terminated...");
            while (true) {
                client.wait();
            }
        }
    }
}

여기서 눈여겨 볼만한 코드는 Guardian 객체를 생성할 때 Lambda를 이용하여 앞서 설명한 Proxy와 ZazyListener 인터페이스를 구현(implement)했다는 것이다. 이렇게 하면 생성자 파라미터로 ZooKeeper와 EventListener 객체를 직접 전달하지 않고도 Guardian내부에서 필요할 시점 사용할 수 있도록 Loosely Coupled된 방식으로 객체를 전달할 수 있다.

참고로 ZooKeeper API는 다른 독립된 Thread로 Async하게 동작하므로 Main Thread가 종료되지 않도록 main 메소드에서 Thread.wait()를 활용한다. ZooKeeperClient 실행중에 종료를 해야 한다면 프로세스를 종료시키면 된다.

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다