1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| @Configuration public class KafkaConfig {
@Bean public ProducerFactory<String, ClockInEvent> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public ProducerFactory<String, ClockInResult> replyProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public ConsumerFactory<String, ClockInEvent> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "attendance-group"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaConsumerFactory<>(configProps); }
@Bean public ConsumerFactory<String, ClockInResult> replyConsumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reply-group"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaConsumerFactory<>(configProps); }
@Bean public KafkaTemplate<String, ClockInEvent> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
@Bean public KafkaTemplate<String, ClockInResult> replyKafkaTemplate() { return new KafkaTemplate<>(replyProducerFactory()); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, ClockInEvent> kafkaListenerContainerFactory( ConsumerFactory<String, ClockInEvent> cf, KafkaTemplate<String, ClockInResult> replyTemplate) {
ConcurrentKafkaListenerContainerFactory<String, ClockInEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(cf); factory.setReplyTemplate(replyTemplate); return factory; }
@Bean public ReplyingKafkaTemplate<String, ClockInEvent, ClockInResult> replyingKafkaTemplate( ProducerFactory<String, ClockInEvent> pf, ConcurrentMessageListenerContainer<String, ClockInResult> repliesContainer) { return new ReplyingKafkaTemplate<>(pf, repliesContainer); }
@Bean public ConcurrentMessageListenerContainer<String, ClockInResult> repliesContainer( ConsumerFactory<String, ClockInResult> cf) { ContainerProperties containerProperties = new ContainerProperties("clock-in-response-topic"); containerProperties.setGroupId("reply-group"); return new ConcurrentMessageListenerContainer<>(cf, containerProperties); } }
|