Collecte de données et envoi à Apache Kafka

introduction



Pour analyser les données de streaming, vous avez besoin des sources de ces données. Les informations fournies par les sources sont également importantes. Et les sources contenant des informations textuelles, par exemple, sont également rares.



Les sources intéressantes sont les suivantes: twitter , vk . Mais ces sources ne conviennent pas à toutes les tâches.



Il existe des sources avec les données requises, mais ces sources ne sont pas diffusées en continu. Les liens suivants peuvent être cités ici: public-apis .



Lors de la résolution de problèmes de diffusion de données, vous pouvez utiliser l'ancienne méthode.



Téléchargez des données et envoyez-les en streaming.



Par exemple, vous pouvez utiliser la source suivante: imdb .

Il convient de noter que imdb fournit ses propres données. Voir Jeux de données IMDb . Mais on peut supposer que les données collectées contiennent directement des informations plus pertinentes.



Langue: Java 1.8.

Bibliothèques: kafka 2.6.0, jsoup 1.13.1.



Collecte de données



La collecte de données est un service qui, en fonction des données d'entrée, charge des pages html, recherche les informations nécessaires et les convertit en un ensemble d'objets.



: imdb. : https://www.imdb.com/search/title/?release_date=%s,%s&countries=%s

1, 2 – . 3 – .



: imdb-extensive-dataset.



:



public interface MovieDirectScrapingService {
    Collection<Movie> scrap();
}


Movie – , ( ..).



class Movie {
    public final String titleId;
    public final String titleUrl;
    public final String title;
    public final String description;
    public final Double rating;
    public final String genres;
    public final String runtime;
    public final String baseUrl;
    public final String baseNameUrl;
    public final String baseTitleUrl;
    public final String participantIds;
    public final String participantNames;
    public final String directorIds;
    public final String directorNames;


.



. jsoup. html- .



String scrap(String url, List<Movie> items) {
    Document doc = null;
    try {
        doc = Jsoup.connect(url).header("Accept-Language", language).get();
    } catch (IOException e) {
        e.printStackTrace();
    }
    if (doc != null) {
        collectItems(doc, items);
        return nextUrl(doc);
    }
    return "";
}


.



String nextUrl(Document doc) {
    Elements nextPageElements = doc.select(".next-page");
    if (nextPageElements.size() > 0) {
        Element hrefElement = nextPageElements.get(0);
        return baseUrl + hrefElement.attributes().get("href");
    }
    return "";
}


. . . , . .



@Override
public Collection<Movie> scrap() {
    String url = String.format(
            baseUrl + "/search/title/?release_date=%s,%s&countries=%s",
            startDate, endDate, countries
    );
    List<Movie> items = new ArrayList<>();
    String nextUrl = url;
    while (true) {
        nextUrl = scrap(nextUrl, items);
        if ("".equals(nextUrl)) {
            break;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
    return items;
}


.





: MovieProducer. : run.



. . .



public void run() {
    try (SimpleStringStringProducer producer = new SimpleStringStringProducer(
            bootstrapServers, clientId, topic)) {
        Collection<Data.Movie> movies = movieDirectScrapingService.scrap();
        List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>();
        for (Data.Movie move : movies) {
            Map<String, String> map = new HashMap<>();
            map.put("title_id", move.titleId);
            map.put("title_url", move.titleUrl);
            String value = JSONObject.toJSONString(map);
            String key = UUID.randomUUID().toString();
            kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value));
        }
        producer.produce(kvList);
    }
}




. .

: MovieDirectScrapingExecutor. : run.



. .



public void run() {
    int countriesCounter = 0;
    List<String> countriesSource = Arrays.asList("us");

    while (true) {
        try {
            LocalDate localDate = LocalDate.now();

            int year = localDate.getYear();
            int month = localDate.getMonthValue();
            int day = localDate.getDayOfMonth();

            String monthString = month < 9 ? "0" + month : Integer.toString(month);
            String dayString = day < 9 ? "0" + day : Integer.toString(day);

            String startDate = year + "-" + monthString + "-" + dayString;
            String endDate = startDate;

            String language = "en";
            String countries = countriesSource.get(countriesCounter);

            execute(language, startDate, endDate, countries);

            Thread.sleep(1000);

            countriesCounter += 1;
            if (countriesCounter >= countriesSource.size()) {
                countriesCounter = 0;
            }

        } catch (InterruptedException e) {
        }
    }
}


MovieDirectScrapingExecutor, , , main.



.



{
  "base_name_url": "https:\/\/www.imdb.com\/name",
  "participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~",
  "title_id": "tt13121702",
  "rating": "0.0",
  "base_url": "https:\/\/www.imdb.com",
  "description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary »",
  "runtime": "",
  "title": "The Christmas Edition",
  "director_ids": "nm0838289~",
  "title_url": "\/title\/tt13121702\/?ref_=adv_li_tt",
  "director_names": "Peter Sullivan~",
  "genres": "Drama, Romance",
  "base_title_url": "https:\/\/www.imdb.com\/title",
  "participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~"
}


.





, , -. kafka-.

. Apache Kafka Kafka Server.



: MovieProducerTest.



public class MovieProducerTest {
    @Test
    void simple() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String zooKeeperHost = "127.0.0.1";
        int zooKeeperPort = 22183;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String topic = "q-data";
        String clientId = "simple";
        try (KafkaServerService kafkaServerService = new KafkaServerService(
                brokerHost, brokerPort, zooKeeperHost, zooKeeperPort
        )
        ) {
            kafkaServerService.start();
            kafkaServerService.createTopic(topic);

            MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton(
                    new Data.Movie(…)
            );
            MovieProducer movieProducer =
                    new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl);
            movieProducer.run();

            kafkaServerService.poll(topic, "simple", 1, 5, (records) -> {
                assertTrue(records.count() > 0);
                ConsumerRecord<String, String> record = records.iterator().next();
                JSONParser jsonParser = new JSONParser();
                JSONObject jsonObject = null;
                try {
                    jsonObject = (JSONObject) jsonParser.parse(record.value());
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                assertNotNull(jsonObject);
            });

            Thread.sleep(5000);
        }
    }
}




, , . .





.




All Articles